NIFI-1165: Use FileChannel instead of RandomAccessFile in order to avoid 
locking files in Windows

Reviewed by Tony Kurc ([email protected])


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e862f7ff
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e862f7ff
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e862f7ff

Branch: refs/heads/NIFI-655
Commit: e862f7ff0393f60ba96afd9c97e756c745f0a19b
Parents: 1e5cc07
Author: Mark Payne <[email protected]>
Authored: Mon Nov 16 09:59:59 2015 -0500
Committer: Tony Kurc <[email protected]>
Committed: Thu Nov 19 01:01:28 2015 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/TailFile.java      | 123 +++++++++++++------
 .../nifi/processors/standard/TestTailFile.java  |  21 ++--
 2 files changed, 100 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e862f7ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
index 684e3cc..1c32c00 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
@@ -27,10 +27,12 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -50,6 +52,7 @@ import 
org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
@@ -120,7 +123,7 @@ public class TailFile extends AbstractProcessor {
         .description("All FlowFiles are routed to this Relationship.")
         .build();
 
-    private volatile TailFileState state = new TailFileState(null, null, null, 
0L, 0L, null, new byte[65536]);
+    private volatile TailFileState state = new TailFileState(null, null, null, 
0L, 0L, null, ByteBuffer.allocate(65536));
     private volatile boolean recoveredRolledFiles = false;
     private volatile Long expectedRecoveryChecksum;
     private volatile boolean tailFileChanged = false;
@@ -143,7 +146,7 @@ public class TailFile extends AbstractProcessor {
     @Override
     public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
         if (FILENAME.equals(descriptor)) {
-            state = new TailFileState(newValue, null, null, 0L, 0L, null, new 
byte[65536]);
+            state = new TailFileState(newValue, null, null, 0L, 0L, null, 
ByteBuffer.allocate(65536));
             recoveredRolledFiles = false;
             tailFileChanged = true;
         } else if (ROLLING_FILENAME_PATTERN.equals(descriptor)) {
@@ -173,7 +176,7 @@ public class TailFile extends AbstractProcessor {
                 final long timestamp = dis.readLong();
                 final boolean checksumPresent = dis.readBoolean();
 
-                RandomAccessFile reader = null;
+                FileChannel reader = null;
                 File tailFile = null;
 
                 if (checksumPresent && tailFilename.equals(filename)) {
@@ -198,19 +201,21 @@ public class TailFile extends AbstractProcessor {
                                 // beginning of the file.
                                 getLogger().debug("When recovering state, 
checksum of tailed file matches the stored checksum. Will resume where left 
off.");
                                 tailFile = existingTailFile;
-                                reader = new RandomAccessFile(tailFile, "r");
-                                reader.seek(position);
+                                reader = FileChannel.open(tailFile.toPath(), 
StandardOpenOption.READ);
+                                getLogger().debug("Created RandomAccessFile {} 
for {} in recoverState", new Object[] {reader, tailFile});
+
+                                reader.position(position);
                             } else {
                                 getLogger().debug("When recovering state, 
checksum of tailed file does not match the stored checksum. Will begin tailing 
current file from beginning.");
                             }
                         }
                     }
 
-                    state = new TailFileState(tailFilename, tailFile, reader, 
position, timestamp, checksum, new byte[65536]);
+                    state = new TailFileState(tailFilename, tailFile, reader, 
position, timestamp, checksum, ByteBuffer.allocate(65536));
                 } else {
                     expectedRecoveryChecksum = null;
                     // tailing a new file since the state file was written 
out. We will reset state.
-                    state = new TailFileState(tailFilename, null, null, 0L, 
0L, null, new byte[65536]);
+                    state = new TailFileState(tailFilename, null, null, 0L, 
0L, null, ByteBuffer.allocate(65536));
                 }
 
                 getLogger().debug("Recovered state {}", new Object[] {state});
@@ -222,6 +227,30 @@ public class TailFile extends AbstractProcessor {
     }
 
 
+    @OnStopped
+    public void cleanup() {
+        final TailFileState state = this.state;
+        if (state == null) {
+            return;
+        }
+
+        final FileChannel channel = state.getReader();
+        if (channel == null) {
+            return;
+        }
+
+        try {
+            channel.close();
+        } catch (final IOException ioe) {
+            getLogger().warn("Failed to close file handle during cleanup");
+        }
+
+        getLogger().debug("Closed FileChannel {}", new Object[] {channel});
+
+        this.state = new TailFileState(state.getFilename(), state.getFile(), 
null, state.getPosition(), state.getTimestamp(), state.getChecksum(), 
state.getBuffer());
+    }
+
+
     public void persistState(final TailFileState state, final String 
stateFilename) throws IOException {
         getLogger().debug("Persisting state {} to {}", new Object[] {state, 
stateFilename});
 
@@ -247,23 +276,26 @@ public class TailFile extends AbstractProcessor {
         }
     }
 
-    private RandomAccessFile createReader(final File file, final long 
position) {
-        final RandomAccessFile reader;
+    private FileChannel createReader(final File file, final long position) {
+        final FileChannel reader;
 
         try {
-            reader = new RandomAccessFile(file, "r");
-        } catch (final FileNotFoundException fnfe) {
-            getLogger().warn("File {} does not exist; will attempt to access 
file again after the configured Yield Duration has elapsed", new Object[] 
{file});
+            reader = FileChannel.open(file.toPath(), StandardOpenOption.READ);
+        } catch (final IOException ioe) {
+            getLogger().warn("Could not open {} due to {}; will attempt to 
access file again after the configured Yield Duration has elapsed", new 
Object[] {file, ioe});
             return null;
         }
 
+        getLogger().debug("Created RandomAccessFile {} for {}", new Object[] 
{reader, file});
+
         try {
-            reader.seek(position);
+            reader.position(position);
         } catch (final IOException ioe) {
             getLogger().error("Failed to read from {} due to {}", new Object[] 
{file, ioe});
 
             try {
                 reader.close();
+                getLogger().debug("Closed RandomAccessFile {}", new Object[] 
{reader});
             } catch (final IOException ioe2) {
             }
 
@@ -321,6 +353,7 @@ public class TailFile extends AbstractProcessor {
                         if (flowFile.getSize() == 0L) {
                             session.remove(flowFile);
                             // use a timestamp of lastModified() + 1 so that 
we do not ingest this file again.
+                            cleanup();
                             state = new 
TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, 
firstFile.lastModified() + 1L, null, state.getBuffer());
                         } else {
                             flowFile = session.putAttribute(flowFile, 
"filename", firstFile.getName());
@@ -330,6 +363,7 @@ public class TailFile extends AbstractProcessor {
                             session.transfer(flowFile, REL_SUCCESS);
 
                             // use a timestamp of lastModified() + 1 so that 
we do not ingest this file again.
+                            cleanup();
                             state = new 
TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, 
firstFile.lastModified() + 1L, null, state.getBuffer());
 
                             // must ensure that we do session.commit() before 
persisting state in order to avoid data loss.
@@ -358,6 +392,7 @@ public class TailFile extends AbstractProcessor {
                     session.transfer(flowFile, REL_SUCCESS);
 
                     // use a timestamp of lastModified() + 1 so that we do not 
ingest this file again.
+                    cleanup();
                     state = new 
TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, 
file.lastModified() + 1L, null, state.getBuffer());
 
                     // must ensure that we do session.commit() before 
persisting state in order to avoid data loss.
@@ -382,13 +417,16 @@ public class TailFile extends AbstractProcessor {
                 if 
(START_BEGINNING_OF_TIME.getValue().equals(recoverPosition)) {
                     recoverRolledFiles(context, session);
                 } else if 
(START_CURRENT_FILE.getValue().equals(recoverPosition)) {
+                    cleanup();
                     state = new 
TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, 0L, 
null, state.getBuffer());
                 } else {
                     final String filename = 
context.getProperty(FILENAME).getValue();
                     final File file = new File(filename);
 
                     try {
-                        final RandomAccessFile raf = new 
RandomAccessFile(file, "r");
+                        final FileChannel channel = 
FileChannel.open(file.toPath(), StandardOpenOption.READ);
+                        getLogger().debug("Created FileChannel {} for {}", new 
Object[] {channel, file});
+
                         final Checksum checksum = new CRC32();
                         final long position = file.length();
                         final long timestamp = file.lastModified();
@@ -398,8 +436,9 @@ public class TailFile extends AbstractProcessor {
                             StreamUtils.copy(in, new NullOutputStream(), 
position);
                         }
 
-                        raf.seek(position);
-                        state = new TailFileState(filename, file, raf, 
position, timestamp, checksum, state.getBuffer());
+                        channel.position(position);
+                        cleanup();
+                        state = new TailFileState(filename, file, channel, 
position, timestamp, checksum, state.getBuffer());
                     } catch (final IOException ioe) {
                         getLogger().error("Attempted to position Reader at 
current position in file {} but failed to do so due to {}", new Object[] {file, 
ioe.toString()}, ioe);
                         context.yield();
@@ -419,7 +458,7 @@ public class TailFile extends AbstractProcessor {
         // the onTrigger method and then create a new state object after we 
finish processing the files.
         TailFileState state = this.state;
         File file = state.getFile();
-        RandomAccessFile reader = state.getReader();
+        FileChannel reader = state.getReader();
         Checksum checksum = state.getChecksum();
         if (checksum == null) {
             checksum = new CRC32();
@@ -441,6 +480,7 @@ public class TailFile extends AbstractProcessor {
 
         // Check if file has rotated
         long fileLength = file.length();
+        final long lastModified = file.lastModified();
         if (fileLength < position) {
             // File has rotated. It's possible that it rotated before we 
finished reading all of the data. As a result, we need
             // to check the last rolled-over file and see if it is longer than 
our position. If so, consume the data past our
@@ -489,6 +529,7 @@ public class TailFile extends AbstractProcessor {
             // Since file has rotated, we close the reader, create a new one, 
and then reset our state.
             try {
                 reader.close();
+                getLogger().debug("Closed RandomAccessFile {}", new Object[] 
{reader, reader});
             } catch (final IOException ioe) {
                 getLogger().warn("Failed to close reader for {} due to {}", 
new Object[] {file, ioe});
             }
@@ -503,12 +544,12 @@ public class TailFile extends AbstractProcessor {
         boolean consumeData = false;
         if (fileLength > position) {
             consumeData = true;
-        } else if (file.lastModified() > timestamp) {
+        } else if (lastModified > timestamp) {
             // This can happen if file is truncated, or is replaced with the 
same amount of data as the old file.
             position = 0;
 
             try {
-                reader.seek(0L);
+                reader.position(0L);
             } catch (final IOException ioe) {
                 getLogger().error("Failed to seek to beginning of file due to 
{}", new Object[] {ioe});
                 context.yield();
@@ -525,7 +566,7 @@ public class TailFile extends AbstractProcessor {
             // data has been written to file. Stream it to a new FlowFile.
             FlowFile flowFile = session.create();
 
-            final RandomAccessFile fileReader = reader;
+            final FileChannel fileReader = reader;
             final LongHolder positionHolder = new LongHolder(position);
             flowFile = session.write(flowFile, new OutputStreamCallback() {
                 @Override
@@ -539,6 +580,7 @@ public class TailFile extends AbstractProcessor {
             // If there ended up being no data, just remove the FlowFile
             if (flowFile.getSize() == 0) {
                 session.remove(flowFile);
+                getLogger().debug("No data to consume; removed created 
FlowFile");
             } else {
                 // determine filename for FlowFile by using <base filename of 
log file>.<initial offset>-<final offset>.<extension>
                 final String tailFilename = file.getName();
@@ -559,7 +601,8 @@ public class TailFile extends AbstractProcessor {
                     TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startNanos));
                 session.transfer(flowFile, REL_SUCCESS);
                 position = positionHolder.get();
-                timestamp = System.currentTimeMillis();
+                timestamp = lastModified;
+                getLogger().debug("Created {} and routed to success", new 
Object[] {flowFile});
             }
         }
 
@@ -571,6 +614,7 @@ public class TailFile extends AbstractProcessor {
             // no data to consume so rather than continually running, yield to 
allow other processors to use the thread.
             // In this case, the state should not have changed, and we will 
have created no FlowFiles, so we don't have to
             // persist the state or commit the session; instead, just return 
here.
+            getLogger().debug("No data to consume; created no FlowFiles");
             context.yield();
             return;
         }
@@ -598,17 +642,20 @@ public class TailFile extends AbstractProcessor {
      * @return The new position after the lines have been read
      * @throws java.io.IOException if an I/O error occurs.
      */
-    private long readLines(final RandomAccessFile reader, final byte[] buffer, 
final OutputStream out, final Checksum checksum) throws IOException {
+    private long readLines(final FileChannel reader, final ByteBuffer buffer, 
final OutputStream out, final Checksum checksum) throws IOException {
         try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-            long pos = reader.getFilePointer();
+            long pos = reader.position();
             long rePos = pos; // position to re-read
 
             int num;
             int linesRead = 0;
             boolean seenCR = false;
+            buffer.clear();
             while (((num = reader.read(buffer)) != -1)) {
+                buffer.flip();
+
                 for (int i = 0; i < num; i++) {
-                    byte ch = buffer[i];
+                    byte ch = buffer.get();
 
                     switch (ch) {
                         case '\n':
@@ -647,11 +694,12 @@ public class TailFile extends AbstractProcessor {
                     }
                 }
 
-                pos = reader.getFilePointer();
+                pos = reader.position();
             }
 
             getLogger().debug("Read {} lines; repositioning reader from {} to 
{}", new Object[] {linesRead, pos, rePos});
-            reader.seek(rePos); // Ensure we can re-read if necessary
+            reader.position(rePos);
+            buffer.clear();
             return rePos;
         }
     }
@@ -726,23 +774,26 @@ public class TailFile extends AbstractProcessor {
 
 
     /**
-     * A simple Java class to hold information about our state so that we can 
maintain this state across multiple invocations of the Processor
+     * A simple Java class to hold information about our state so that we can 
maintain this state across multiple invocations of the Processor.
+     *
+     * We use a FileChannel to read from the file, rather than a 
BufferedInputStream, etc. because we want to be able to read any amount of data
+     * and then reposition the reader if we need to, as a result of a line not 
being terminated (i.e., no new-line).
      */
     static class TailFileState {
         private final String filename; // hold onto filename and not just File 
because we want to match that against the user-defined filename to recover from
         private final File file;
-        private final RandomAccessFile raf;
+        private final FileChannel fileChannel;
         private final long position;
         private final long timestamp;
         private final Checksum checksum;
-        private final byte[] buffer;
+        private final ByteBuffer buffer;
 
-        public TailFileState(final String filename, final File file, final 
RandomAccessFile raf, final long position, final long timestamp, final Checksum 
checksum, final byte[] buffer) {
+        public TailFileState(final String filename, final File file, final 
FileChannel fileChannel, final long position, final long timestamp, final 
Checksum checksum, final ByteBuffer buffer) {
             this.filename = filename;
             this.file = file;
-            this.raf = raf;
+            this.fileChannel = fileChannel;
             this.position = position;
-            this.timestamp = (timestamp / 1000) * 1000; // many operating 
systems will use only second-level precision for last-modified times so cut off 
milliseconds
+            this.timestamp = timestamp; // many operating systems will use 
only second-level precision for last-modified times so cut off milliseconds
             this.checksum = checksum;
             this.buffer = buffer;
         }
@@ -755,8 +806,8 @@ public class TailFile extends AbstractProcessor {
             return file;
         }
 
-        public RandomAccessFile getReader() {
-            return raf;
+        public FileChannel getReader() {
+            return fileChannel;
         }
 
         public long getPosition() {
@@ -771,7 +822,7 @@ public class TailFile extends AbstractProcessor {
             return checksum;
         }
 
-        public byte[] getBuffer() {
+        public ByteBuffer getBuffer() {
             return buffer;
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/e862f7ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
index 85638ad..1445d88 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
@@ -37,6 +37,7 @@ import org.junit.Test;
 
 public class TestTailFile {
     private File file;
+    private TailFile processor;
     private RandomAccessFile raf;
     private TestRunner runner;
 
@@ -64,7 +65,8 @@ public class TestTailFile {
         stateFile.delete();
         Assert.assertFalse(stateFile.exists());
 
-        runner = TestRunners.newTestRunner(new TailFile());
+        processor = new TailFile();
+        runner = TestRunners.newTestRunner(processor);
         runner.setProperty(TailFile.FILENAME, "target/log.txt");
         runner.setProperty(TailFile.STATE_FILE, "target/tail-file.state");
         runner.assertValid();
@@ -77,6 +79,8 @@ public class TestTailFile {
         if (raf != null) {
             raf.close();
         }
+
+        processor.cleanup();
     }
 
 
@@ -153,6 +157,7 @@ public class TestTailFile {
         runner.setProperty(TailFile.START_POSITION, 
TailFile.START_CURRENT_TIME.getValue());
 
         raf.write("hello world\n".getBytes());
+        Thread.sleep(1000);
         runner.run(100);
         runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
     }
@@ -231,7 +236,9 @@ public class TestTailFile {
 
         raf.write("world".getBytes());
         raf.close();
-        file.renameTo(new File("target/log1.txt"));
+
+        processor.cleanup(); // Need to do this for Windows because otherwise 
we cannot rename the file because we have the file open still in the same 
process.
+        assertTrue(file.renameTo(new File("target/log1.txt")));
 
         raf = new RandomAccessFile(new File("target/log.txt"), "rw");
         raf.write("1\n".getBytes());
@@ -255,7 +262,7 @@ public class TestTailFile {
         runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
 
         raf.write("hello\n".getBytes());
-        runner.run(1, false, false);
+        runner.run(1, true, false);
         runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
         
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n");
         runner.clearTransferState();
@@ -288,7 +295,7 @@ public class TestTailFile {
         runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
 
         raf.write("hello\n".getBytes());
-        runner.run(1, false, false);
+        runner.run(1, true, false);
         runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
         
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n");
         runner.clearTransferState();
@@ -305,6 +312,8 @@ public class TestTailFile {
         // write to a new file.
         file = new File("target/log.txt");
         raf = new RandomAccessFile(file, "rw");
+
+        Thread.sleep(1000L);
         raf.write("abc\n".getBytes());
 
         // rename file to log.1
@@ -331,9 +340,6 @@ public class TestTailFile {
         runner.run();
         runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
 
-        final long start = System.currentTimeMillis();
-        Thread.sleep(1100L);
-
         raf.write("Hello, World".getBytes());
         runner.run();
         runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
@@ -346,7 +352,6 @@ public class TestTailFile {
         assertNotNull(state);
         assertEquals("target/log.txt", state.getFilename());
         assertTrue(state.getTimestamp() <= System.currentTimeMillis());
-        assertTrue(state.getTimestamp() >= start);
         assertEquals(14, state.getPosition());
         
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("Hello,
 World\r\n");
 

Reply via email to