http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java 
b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java
new file mode 100644
index 0000000..41a0557
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java
@@ -0,0 +1,623 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.file;
+
+import java.io.BufferedInputStream;
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+
+import org.slf4j.Logger;
+
+/**
+ * A utility class containing a few useful static methods to do typical IO
+ * operations.
+ *
+ * @author unattributed
+ */
+public class FileUtils {
+
+    public static final long TRANSFER_CHUNK_SIZE_BYTES = 1024 * 1024 * 8; //8 
MB chunks
+    public static final long MILLIS_BETWEEN_ATTEMPTS = 50L;
+
+    /**
+     * Closes the given closeable quietly - no logging, no exceptions...
+     *
+     * @param closeable
+     */
+    public static void closeQuietly(final Closeable closeable) {
+        if (null != closeable) {
+            try {
+                closeable.close();
+            } catch (final IOException io) {/*IGNORE*/
+
+            }
+        }
+    }
+
+    /**
+     * Releases the given lock quietly - no logging, no exception
+     *
+     * @param lock
+     */
+    public static void releaseQuietly(final FileLock lock) {
+        if (null != lock) {
+            try {
+                lock.release();
+            } catch (final IOException io) {
+                /*IGNORE*/
+            }
+        }
+    }
+
+    public static void ensureDirectoryExistAndCanAccess(final File dir) throws 
IOException {
+        if (dir.exists() && !dir.isDirectory()) {
+            throw new IOException(dir.getAbsolutePath() + " is not a 
directory");
+        } else if (!dir.exists()) {
+            final boolean made = dir.mkdirs();
+            if (!made) {
+                throw new IOException(dir.getAbsolutePath() + " could not be 
created");
+            }
+        }
+        if (!(dir.canRead() && dir.canWrite())) {
+            throw new IOException(dir.getAbsolutePath() + " directory does not 
have read/write privilege");
+        }
+    }
+
+    /**
+     * Deletes the given file. If the given file exists but could not be 
deleted
+     * this will be printed as a warning to the given logger
+     *
+     * @param file
+     * @param logger
+     * @return
+     */
+    public static boolean deleteFile(final File file, final Logger logger) {
+        return FileUtils.deleteFile(file, logger, 1);
+    }
+
+    /**
+     * Deletes the given file. If the given file exists but could not be 
deleted
+     * this will be printed as a warning to the given logger
+     *
+     * @param file
+     * @param logger
+     * @param attempts indicates how many times an attempt to delete should be
+     * made
+     * @return true if given file no longer exists
+     */
+    public static boolean deleteFile(final File file, final Logger logger, 
final int attempts) {
+        if (file == null) {
+            return false;
+        }
+        boolean isGone = false;
+        try {
+            if (file.exists()) {
+                final int effectiveAttempts = Math.max(1, attempts);
+                for (int i = 0; i < effectiveAttempts && !isGone; i++) {
+                    isGone = file.delete() || !file.exists();
+                    if (!isGone && (effectiveAttempts - i) > 1) {
+                        FileUtils.sleepQuietly(MILLIS_BETWEEN_ATTEMPTS);
+                    }
+                }
+                if (!isGone && logger != null) {
+                    logger.warn("File appears to exist but unable to delete 
file: " + file.getAbsolutePath());
+                }
+            }
+        } catch (final Throwable t) {
+            if (logger != null) {
+                logger.warn("Unable to delete file: '" + 
file.getAbsolutePath() + "' due to " + t);
+            }
+        }
+        return isGone;
+    }
+
+    /**
+     * Deletes all of the given files. If any exist and cannot be deleted that
+     * will be printed at warn to the given logger.
+     *
+     * @param files can be null
+     * @param logger can be null
+     */
+    public static void deleteFile(final List<File> files, final Logger logger) 
{
+        FileUtils.deleteFile(files, logger, 1);
+    }
+
+    /**
+     * Deletes all of the given files. If any exist and cannot be deleted that
+     * will be printed at warn to the given logger.
+     *
+     * @param files can be null
+     * @param logger can be null
+     * @param attempts indicates how many times an attempt should be made to
+     * delete each file
+     */
+    public static void deleteFile(final List<File> files, final Logger logger, 
final int attempts) {
+        if (null == files || files.isEmpty()) {
+            return;
+        }
+        final int effectiveAttempts = Math.max(1, attempts);
+        for (final File file : files) {
+            try {
+                boolean isGone = false;
+                for (int i = 0; i < effectiveAttempts && !isGone; i++) {
+                    isGone = file.delete() || !file.exists();
+                    if (!isGone && (effectiveAttempts - i) > 1) {
+                        FileUtils.sleepQuietly(MILLIS_BETWEEN_ATTEMPTS);
+                    }
+                }
+                if (!isGone && logger != null) {
+                    logger.warn("File appears to exist but unable to delete 
file: " + file.getAbsolutePath());
+                }
+            } catch (final Throwable t) {
+                if (null != logger) {
+                    logger.warn("Unable to delete file given from path: '" + 
file.getPath() + "' due to " + t);
+                }
+            }
+        }
+    }
+
+    /**
+     * Deletes all files (not directories..) in the given directory (non
+     * recursive) that match the given filename filter. If any file cannot be
+     * deleted then this is printed at warn to the given logger.
+     *
+     * @param directory
+     * @param filter if null then no filter is used
+     * @param logger
+     */
+    public static void deleteFilesInDir(final File directory, final 
FilenameFilter filter, final Logger logger) {
+        FileUtils.deleteFilesInDir(directory, filter, logger, false);
+    }
+
+    /**
+     * Deletes all files (not directories) in the given directory (recursive)
+     * that match the given filename filter. If any file cannot be deleted then
+     * this is printed at warn to the given logger.
+     *
+     * @param directory
+     * @param filter if null then no filter is used
+     * @param logger
+     * @param recurse
+     */
+    public static void deleteFilesInDir(final File directory, final 
FilenameFilter filter, final Logger logger, final boolean recurse) {
+        FileUtils.deleteFilesInDir(directory, filter, logger, recurse, false);
+    }
+
+    /**
+     * Deletes all files (not directories) in the given directory (recursive)
+     * that match the given filename filter. If any file cannot be deleted then
+     * this is printed at warn to the given logger.
+     *
+     * @param directory
+     * @param filter if null then no filter is used
+     * @param logger
+     * @param recurse
+     * @param deleteEmptyDirectories default is false; if true will delete
+     * directories found that are empty
+     */
+    public static void deleteFilesInDir(final File directory, final 
FilenameFilter filter, final Logger logger, final boolean recurse, final 
boolean deleteEmptyDirectories) {
+        // ensure the specified directory is actually a directory and that it 
exists
+        if (null != directory && directory.isDirectory()) {
+            final File ingestFiles[] = directory.listFiles();
+            for (File ingestFile : ingestFiles) {
+                boolean process = (filter == null) ? true : 
filter.accept(directory, ingestFile.getName());
+                if (ingestFile.isFile() && process) {
+                    FileUtils.deleteFile(ingestFile, logger, 3);
+                }
+                if (ingestFile.isDirectory() && recurse) {
+                    FileUtils.deleteFilesInDir(ingestFile, filter, logger, 
recurse, deleteEmptyDirectories);
+                    if (deleteEmptyDirectories && ingestFile.list().length == 
0) {
+                        FileUtils.deleteFile(ingestFile, logger, 3);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Deletes given files.
+     *
+     * @param files
+     * @param recurse will recurse
+     * @throws IOException
+     */
+    public static void deleteFiles(final Collection<File> files, final boolean 
recurse) throws IOException {
+        for (final File file : files) {
+            FileUtils.deleteFile(file, recurse);
+        }
+    }
+
+    public static void deleteFile(final File file, final boolean recurse) 
throws IOException {
+        if (file.isDirectory() && recurse) {
+            FileUtils.deleteFiles(Arrays.asList(file.listFiles()), recurse);
+        }
+        //now delete the file itself regardless of whether it is plain file or 
a directory
+        if (!FileUtils.deleteFile(file, null, 5)) {
+            throw new IOException("Unable to delete " + 
file.getAbsolutePath());
+        }
+    }
+
+    /**
+     * Randomly generates a sequence of bytes and overwrites the contents of 
the
+     * file a number of times. The file is then deleted.
+     *
+     * @param file File to be overwritten a number of times and, ultimately,
+     * deleted
+     * @param passes Number of times file should be overwritten
+     * @throws IOException if something makes shredding or deleting a problem
+     */
+    public static void shredFile(final File file, final int passes)
+            throws IOException {
+        final Random generator = new Random();
+        final long fileLength = file.length();
+        final int byteArraySize = (int) Math.min(fileLength, 1048576); // 1MB
+        final byte[] b = new byte[byteArraySize];
+        final long numOfRandomWrites = (fileLength / b.length) + 1;
+        final FileOutputStream fos = new FileOutputStream(file);
+        try {
+            // Over write file contents (passes) times
+            final FileChannel channel = fos.getChannel();
+            for (int i = 0; i < passes; i++) {
+                generator.nextBytes(b);
+                for (int j = 0; j <= numOfRandomWrites; j++) {
+                    fos.write(b);
+                }
+                fos.flush();
+                channel.position(0);
+            }
+            // Write out "0" for each byte in the file
+            Arrays.fill(b, (byte) 0);
+            for (int j = 0; j < numOfRandomWrites; j++) {
+                fos.write(b);
+            }
+            fos.flush();
+            fos.close();
+            // Try to delete the file a few times
+            if (!FileUtils.deleteFile(file, null, 5)) {
+                throw new IOException("Failed to delete file after shredding");
+            }
+
+        } finally {
+            FileUtils.closeQuietly(fos);
+        }
+    }
+
+    public static long copy(final InputStream in, final OutputStream out) 
throws IOException {
+        final byte[] buffer = new byte[65536];
+        long copied = 0L;
+        int len;
+        while ((len = in.read(buffer)) > 0) {
+            out.write(buffer, 0, len);
+            copied += len;
+        }
+
+        return copied;
+    }
+
+    public static long copyBytes(final byte[] bytes, final File destination, 
final boolean lockOutputFile) throws FileNotFoundException, IOException {
+        FileOutputStream fos = null;
+        FileLock outLock = null;
+        long fileSize = 0L;
+        try {
+            fos = new FileOutputStream(destination);
+            final FileChannel out = fos.getChannel();
+            if (lockOutputFile) {
+                outLock = out.tryLock(0, Long.MAX_VALUE, false);
+                if (null == outLock) {
+                    throw new IOException("Unable to obtain exclusive file 
lock for: " + destination.getAbsolutePath());
+                }
+            }
+            fos.write(bytes);
+            fos.flush();
+            fileSize = bytes.length;
+        } finally {
+            FileUtils.releaseQuietly(outLock);
+            FileUtils.closeQuietly(fos);
+        }
+        return fileSize;
+    }
+
+    /**
+     * Copies the given source file to the given destination file. The given
+     * destination will be overwritten if it already exists.
+     *
+     * @param source
+     * @param destination
+     * @param lockInputFile if true will lock input file during copy; if false
+     * will not
+     * @param lockOutputFile if true will lock output file during copy; if 
false
+     * will not
+     * @param move if true will perform what is effectively a move operation
+     * rather than a pure copy. This allows for potentially highly efficient
+     * movement of the file but if not possible this will revert to a copy then
+     * delete behavior. If false, then the file is copied and the source file 
is
+     * retained. If a true rename/move occurs then no lock is held during that
+     * time.
+     * @param logger if failures occur, they will be logged to this logger if
+     * possible. If this logger is null, an IOException will instead be thrown,
+     * indicating the problem.
+     * @return long number of bytes copied
+     * @throws FileNotFoundException if the source file could not be found
+     * @throws IOException
+     * @throws SecurityException if a security manager denies the needed file
+     * operations
+     */
+    public static long copyFile(final File source, final File destination, 
final boolean lockInputFile, final boolean lockOutputFile, final boolean move, 
final Logger logger) throws FileNotFoundException, IOException {
+
+        FileInputStream fis = null;
+        FileOutputStream fos = null;
+        FileLock inLock = null;
+        FileLock outLock = null;
+        long fileSize = 0L;
+        if (!source.canRead()) {
+            throw new IOException("Must at least have read permission");
+
+        }
+        if (move && source.renameTo(destination)) {
+            fileSize = destination.length();
+        } else {
+            try {
+                fis = new FileInputStream(source);
+                fos = new FileOutputStream(destination);
+                final FileChannel in = fis.getChannel();
+                final FileChannel out = fos.getChannel();
+                if (lockInputFile) {
+                    inLock = in.tryLock(0, Long.MAX_VALUE, true);
+                    if (null == inLock) {
+                        throw new IOException("Unable to obtain shared file 
lock for: " + source.getAbsolutePath());
+                    }
+                }
+                if (lockOutputFile) {
+                    outLock = out.tryLock(0, Long.MAX_VALUE, false);
+                    if (null == outLock) {
+                        throw new IOException("Unable to obtain exclusive file 
lock for: " + destination.getAbsolutePath());
+                    }
+                }
+                long bytesWritten = 0;
+                do {
+                    bytesWritten += out.transferFrom(in, bytesWritten, 
TRANSFER_CHUNK_SIZE_BYTES);
+                    fileSize = in.size();
+                } while (bytesWritten < fileSize);
+                out.force(false);
+                FileUtils.closeQuietly(fos);
+                FileUtils.closeQuietly(fis);
+                fos = null;
+                fis = null;
+                if (move && !FileUtils.deleteFile(source, null, 5)) {
+                    if (logger == null) {
+                        FileUtils.deleteFile(destination, null, 5);
+                        throw new IOException("Could not remove file " + 
source.getAbsolutePath());
+                    } else {
+                        logger.warn("Configured to delete source file when 
renaming/move not successful.  However, unable to delete file at: " + 
source.getAbsolutePath());
+                    }
+                }
+            } finally {
+                FileUtils.releaseQuietly(inLock);
+                FileUtils.releaseQuietly(outLock);
+                FileUtils.closeQuietly(fos);
+                FileUtils.closeQuietly(fis);
+            }
+        }
+        return fileSize;
+    }
+
+    /**
+     * Copies the given source file to the given destination file. The given
+     * destination will be overwritten if it already exists.
+     *
+     * @param source
+     * @param destination
+     * @param lockInputFile if true will lock input file during copy; if false
+     * will not
+     * @param lockOutputFile if true will lock output file during copy; if 
false
+     * will not
+     * @param logger
+     * @return long number of bytes copied
+     * @throws FileNotFoundException if the source file could not be found
+     * @throws IOException
+     * @throws SecurityException if a security manager denies the needed file
+     * operations
+     */
+    public static long copyFile(final File source, final File destination, 
final boolean lockInputFile, final boolean lockOutputFile, final Logger logger) 
throws FileNotFoundException, IOException {
+        return FileUtils.copyFile(source, destination, lockInputFile, 
lockOutputFile, false, logger);
+    }
+
+    public static long copyFile(final File source, final OutputStream stream, 
final boolean closeOutputStream, final boolean lockInputFile) throws 
FileNotFoundException, IOException {
+        FileInputStream fis = null;
+        FileLock inLock = null;
+        long fileSize = 0L;
+        try {
+            fis = new FileInputStream(source);
+            final FileChannel in = fis.getChannel();
+            if (lockInputFile) {
+                inLock = in.tryLock(0, Long.MAX_VALUE, true);
+                if (inLock == null) {
+                    throw new IOException("Unable to obtain exclusive file 
lock for: " + source.getAbsolutePath());
+                }
+
+            }
+
+            byte[] buffer = new byte[1 << 18]; //256 KB
+            int bytesRead = -1;
+            while ((bytesRead = fis.read(buffer)) != -1) {
+                stream.write(buffer, 0, bytesRead);
+            }
+            in.force(false);
+            stream.flush();
+            fileSize = in.size();
+        } finally {
+            FileUtils.releaseQuietly(inLock);
+            FileUtils.closeQuietly(fis);
+            if (closeOutputStream) {
+                FileUtils.closeQuietly(stream);
+            }
+        }
+        return fileSize;
+    }
+
+    public static long copyFile(final InputStream stream, final File 
destination, final boolean closeInputStream, final boolean lockOutputFile) 
throws FileNotFoundException, IOException {
+        final Path destPath = destination.toPath();
+        final long size = Files.copy(stream, destPath);
+        if (closeInputStream) {
+            stream.close();
+        }
+        return size;
+    }
+
+    /**
+     * Renames the given file from the source path to the destination path. 
This
+     * handles multiple attempts. This should only be used to rename within a
+     * given directory. Renaming across directories might not work well. See 
the
+     * <code>File.renameTo</code> for more information.
+     *
+     * @param source the file to rename
+     * @param destination the file path to rename to
+     * @param maxAttempts the max number of attempts to attempt the rename
+     * @throws IOException if rename isn't successful
+     */
+    public static void renameFile(final File source, final File destination, 
final int maxAttempts) throws IOException {
+        FileUtils.renameFile(source, destination, maxAttempts, false);
+    }
+
+    /**
+     * Renames the given file from the source path to the destination path. 
This
+     * handles multiple attempts. This should only be used to rename within a
+     * given directory. Renaming across directories might not work well. See 
the
+     * <code>File.renameTo</code> for more information.
+     *
+     * @param source the file to rename
+     * @param destination the file path to rename to
+     * @param maxAttempts the max number of attempts to attempt the rename
+     * @param replace if true and a rename attempt fails will check if a file 
is
+     * already at the destination path. If so it will delete that file and
+     * attempt the rename according the remaining maxAttempts. If false, any
+     * conflicting files will be left as they were and the rename attempts will
+     * fail if conflicting.
+     * @throws IOException if rename isn't successful
+     */
+    public static void renameFile(final File source, final File destination, 
final int maxAttempts, final boolean replace) throws IOException {
+        final int attempts = (replace || maxAttempts < 1) ? Math.max(2, 
maxAttempts) : maxAttempts;
+        boolean renamed = false;
+        for (int i = 0; i < attempts; i++) {
+            renamed = source.renameTo(destination);
+            if (!renamed) {
+                FileUtils.deleteFile(destination, null, 5);
+            } else {
+                break; //rename has succeeded
+            }
+        }
+        if (!renamed) {
+            throw new IOException("Attempted " + maxAttempts + " times but 
unable to rename from \'" + source.getPath() + "\' to \'" + 
destination.getPath() + "\'");
+
+        }
+    }
+
+    public static void sleepQuietly(final long millis) {
+        try {
+            Thread.sleep(millis);
+        } catch (final InterruptedException ex) {
+            /* do nothing */
+        }
+    }
+
+    /**
+     * Syncs a primary copy of a file with the copy in the restore directory. 
If
+     * the restore directory does not have a file and the primary has a file,
+     * the the primary's file is copied to the restore directory. Else if the
+     * restore directory has a file, but the primary does not, then the
+     * restore's file is copied to the primary directory. Else if the primary
+     * file is different than the restore file, then an IllegalStateException 
is
+     * thrown. Otherwise, if neither file exists, then no syncing is performed.
+     *
+     * @param primaryFile the primary file
+     * @param restoreFile the restore file
+     * @param logger a logger
+     * @throws IOException if an I/O problem was encountered during syncing
+     * @throws IllegalStateException if the primary and restore copies exist 
but
+     * are different
+     */
+    public static void syncWithRestore(final File primaryFile, final File 
restoreFile, final Logger logger)
+            throws IOException {
+
+        if (primaryFile.exists() && !restoreFile.exists()) {
+            // copy primary file to restore
+            copyFile(primaryFile, restoreFile, false, false, logger);
+        } else if (restoreFile.exists() && !primaryFile.exists()) {
+            // copy restore file to primary
+            copyFile(restoreFile, primaryFile, false, false, logger);
+        } else if (primaryFile.exists() && restoreFile.exists() && 
!isSame(primaryFile, restoreFile)) {
+            throw new IllegalStateException(String.format("Primary file '%s' 
is different than restore file '%s'",
+                    primaryFile.getAbsoluteFile(), 
restoreFile.getAbsolutePath()));
+        }
+    }
+
+    /**
+     * Returns true if the given files are the same according to their MD5 
hash.
+     *
+     * @param file1 a file
+     * @param file2 a file
+     * @return true if the files are the same; false otherwise
+     * @throws IOException if the MD5 hash could not be computed
+     */
+    public static boolean isSame(final File file1, final File file2) throws 
IOException {
+        return Arrays.equals(computeMd5Digest(file1), computeMd5Digest(file2));
+    }
+
+    /**
+     * Returns the MD5 hash of the given file.
+     *
+     * @param file a file
+     * @return the MD5 hash
+     * @throws IOException if the MD5 hash could not be computed
+     */
+    public static byte[] computeMd5Digest(final File file) throws IOException {
+        final MessageDigest digest;
+        try {
+            digest = MessageDigest.getInstance("MD5");
+        } catch (final NoSuchAlgorithmException nsae) {
+            throw new IOException(nsae);
+        }
+
+        try (final FileInputStream fis = new FileInputStream(file)) {
+            int len;
+            final byte[] buffer = new byte[8192];
+            while ((len = fis.read(buffer)) > -1) {
+                if (len > 0) {
+                    digest.update(buffer, 0, len);
+                }
+            }
+        }
+        return digest.digest();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/CompoundUpdateMonitor.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/CompoundUpdateMonitor.java
 
b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/CompoundUpdateMonitor.java
new file mode 100644
index 0000000..6f9c616
--- /dev/null
+++ 
b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/CompoundUpdateMonitor.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.file.monitor;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * An {@link UpdateMonitor} that combines multiple <code>UpdateMonitor</code>s
+ * such that it will indicate a change in a file only if ALL sub-monitors
+ * indicate a change. The sub-monitors will be applied in the order given and 
if
+ * any indicates that the state has not changed, the subsequent sub-monitors 
may
+ * not be given a chance to run
+ */
+public class CompoundUpdateMonitor implements UpdateMonitor {
+
+    private final List<UpdateMonitor> monitors;
+
+    public CompoundUpdateMonitor(final UpdateMonitor first, final 
UpdateMonitor... others) {
+        monitors = new ArrayList<>();
+        monitors.add(first);
+        for (final UpdateMonitor monitor : others) {
+            monitors.add(monitor);
+        }
+    }
+
+    @Override
+    public Object getCurrentState(final Path path) throws IOException {
+        return new DeferredMonitorAction(monitors, path);
+    }
+
+    private static class DeferredMonitorAction {
+
+        private static final Object NON_COMPUTED_VALUE = new Object();
+
+        private final List<UpdateMonitor> monitors;
+        private final Path path;
+
+        private final Object[] preCalculated;
+
+        public DeferredMonitorAction(final List<UpdateMonitor> monitors, final 
Path path) {
+            this.monitors = monitors;
+            this.path = path;
+            preCalculated = new Object[monitors.size()];
+
+            for (int i = 0; i < preCalculated.length; i++) {
+                preCalculated[i] = NON_COMPUTED_VALUE;
+            }
+        }
+
+        private Object getCalculatedValue(final int i) throws IOException {
+            if (preCalculated[i] == NON_COMPUTED_VALUE) {
+                preCalculated[i] = monitors.get(i).getCurrentState(path);
+            }
+
+            return preCalculated[i];
+        }
+
+        @Override
+        public boolean equals(final Object obj) {
+            // must return true unless ALL DeferredMonitorAction's indicate 
that they are different
+            if (obj == null) {
+                return false;
+            }
+
+            if (!(obj instanceof DeferredMonitorAction)) {
+                return false;
+            }
+
+            final DeferredMonitorAction other = (DeferredMonitorAction) obj;
+            try {
+                // Go through each UpdateMonitor's value and check if the 
value has changed.
+                for (int i = 0; i < preCalculated.length; i++) {
+                    final Object mine = getCalculatedValue(i);
+                    final Object theirs = other.getCalculatedValue(i);
+
+                    if (mine == theirs) {
+                        // same
+                        return true;
+                    }
+
+                    if (mine == null && theirs == null) {
+                        // same
+                        return true;
+                    }
+
+                    if (mine.equals(theirs)) {
+                        return true;
+                    }
+                }
+            } catch (final IOException e) {
+                return false;
+            }
+
+            // No DeferredMonitorAction was the same as last time. Therefore, 
it's not equal
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/LastModifiedMonitor.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/LastModifiedMonitor.java
 
b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/LastModifiedMonitor.java
new file mode 100644
index 0000000..e6be558
--- /dev/null
+++ 
b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/LastModifiedMonitor.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.file.monitor;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+public class LastModifiedMonitor implements UpdateMonitor {
+
+    @Override
+    public Object getCurrentState(final Path path) throws IOException {
+        return Files.getLastModifiedTime(path);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/MD5SumMonitor.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/MD5SumMonitor.java
 
b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/MD5SumMonitor.java
new file mode 100644
index 0000000..8dea4bf
--- /dev/null
+++ 
b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/MD5SumMonitor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.file.monitor;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+public class MD5SumMonitor implements UpdateMonitor {
+
+    @Override
+    public Object getCurrentState(final Path path) throws IOException {
+        final MessageDigest digest;
+        try {
+            digest = MessageDigest.getInstance("MD5");
+        } catch (final NoSuchAlgorithmException nsae) {
+            throw new AssertionError(nsae);
+        }
+
+        try (final FileInputStream fis = new FileInputStream(path.toFile())) {
+            int len;
+            final byte[] buffer = new byte[8192];
+            while ((len = fis.read(buffer)) > -1) {
+                if (len > 0) {
+                    digest.update(buffer, 0, len);
+                }
+            }
+        }
+
+        // Return a ByteBuffer instead of byte[] because we want equals() to 
do a deep equality
+        return ByteBuffer.wrap(digest.digest());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/SynchronousFileWatcher.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/SynchronousFileWatcher.java
 
b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/SynchronousFileWatcher.java
new file mode 100644
index 0000000..e0089c1
--- /dev/null
+++ 
b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/SynchronousFileWatcher.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.file.monitor;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Allows the user to configure a {@link java.nio.file.Path Path} to watch for
+ * modifications and periodically poll to check if the file has been modified
+ */
+public class SynchronousFileWatcher {
+
+    private final Path path;
+    private final long checkUpdateMillis;
+    private final UpdateMonitor monitor;
+    private final AtomicReference<StateWrapper> lastState;
+    private final Lock resourceLock = new ReentrantLock();
+
+    public SynchronousFileWatcher(final Path path, final UpdateMonitor 
monitor) {
+        this(path, monitor, 0L);
+    }
+
+    public SynchronousFileWatcher(final Path path, final UpdateMonitor 
monitor, final long checkMillis) {
+        if (checkMillis < 0) {
+            throw new IllegalArgumentException();
+        }
+
+        this.path = path;
+        checkUpdateMillis = checkMillis;
+        this.monitor = monitor;
+
+        Object currentState;
+        try {
+            currentState = monitor.getCurrentState(path);
+        } catch (final IOException e) {
+            currentState = null;
+        }
+
+        this.lastState = new AtomicReference<>(new StateWrapper(currentState));
+    }
+
+    /**
+     * Checks if the file has been updated according to the configured
+     * {@link UpdateMonitor} and resets the state
+     *
+     * @return
+     * @throws IOException
+     */
+    public boolean checkAndReset() throws IOException {
+        if (checkUpdateMillis <= 0) { // if checkUpdateMillis <= 0, always 
check
+            return checkForUpdate();
+        } else {
+            final StateWrapper stateWrapper = lastState.get();
+            if (stateWrapper.getTimestamp() < System.currentTimeMillis() - 
checkUpdateMillis) {
+                return checkForUpdate();
+            }
+            return false;
+        }
+    }
+
+    private boolean checkForUpdate() throws IOException {
+        if (resourceLock.tryLock()) {
+            try {
+                final StateWrapper wrapper = lastState.get();
+                final Object newState = monitor.getCurrentState(path);
+                if (newState == null && wrapper.getState() == null) {
+                    return false;
+                }
+                if (newState == null || wrapper.getState() == null) {
+                    lastState.set(new StateWrapper(newState));
+                    return true;
+                }
+
+                final boolean unmodified = newState.equals(wrapper.getState());
+                if (!unmodified) {
+                    lastState.set(new StateWrapper(newState));
+                }
+                return !unmodified;
+            } finally {
+                resourceLock.unlock();
+            }
+        } else {
+            return false;
+        }
+    }
+
+    private static class StateWrapper {
+
+        private final Object state;
+        private final long timestamp;
+
+        public StateWrapper(final Object state) {
+            this.state = state;
+            this.timestamp = System.currentTimeMillis();
+        }
+
+        public Object getState() {
+            return state;
+        }
+
+        public long getTimestamp() {
+            return timestamp;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/UpdateMonitor.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/UpdateMonitor.java
 
b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/UpdateMonitor.java
new file mode 100644
index 0000000..20ed1dd
--- /dev/null
+++ 
b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/UpdateMonitor.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.file.monitor;
+
+import java.io.IOException;
+import java.nio.file.Path;
+
+public interface UpdateMonitor {
+
+    Object getCurrentState(Path path) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/Search.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/Search.java 
b/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/Search.java
new file mode 100644
index 0000000..59b444a
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/Search.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.search;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Set;
+
+import org.apache.nifi.util.search.ahocorasick.SearchState;
+
+/**
+ * Defines an interface to search for content given a set of search terms. Any
+ * implementation of search must be thread safe.
+ *
+ * @author
+ * @param <T>
+ */
+public interface Search<T> {
+
+    /**
+     * Establishes the dictionary of terms which will be searched in subsequent
+     * search calls. This can be called only once
+     *
+     * @param terms
+     */
+    void initializeDictionary(Set<SearchTerm<T>> terms);
+
+    /**
+     * Searches the given input stream for matches between the already 
specified
+     * dictionary and the contents scanned.
+     *
+     * @param haystack
+     * @param findAll if true will find all matches if false will find only the
+     * first match
+     * @return SearchState containing results Map might be empty which 
indicates
+     * no matches found but will not be null
+     * @throws IOException Thrown for any exceptions occurring while searching.
+     * @throws IllegalStateException if the dictionary has not yet been
+     * initialized
+     */
+    SearchState<T> search(InputStream haystack, boolean findAll) throws 
IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/SearchTerm.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/SearchTerm.java 
b/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/SearchTerm.java
new file mode 100644
index 0000000..62de964
--- /dev/null
+++ 
b/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/SearchTerm.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.search;
+
+import java.nio.charset.Charset;
+import java.util.Arrays;
+
+/**
+ * This is an immutable thread safe object representing a search term
+ *
+ * @author
+ * @param <T>
+ */
+public class SearchTerm<T> {
+
+    private final byte[] bytes;
+    private final int hashCode;
+    private final T reference;
+
+    /**
+     * Constructs a SearchTerm. Defensively copies the given byte array
+     *
+     * @param bytes
+     * @throws IllegalArgument exception if given bytes are null or 0 length
+     */
+    public SearchTerm(final byte[] bytes) {
+        this(bytes, true, null);
+    }
+
+    /**
+     * Constructs a search term. Optionally performs a defensive copy of the
+     * given byte array. If the caller indicates a defensive copy is not
+     * necessary then they must not change the given arrays state any longer
+     *
+     * @param bytes
+     * @param defensiveCopy
+     * @param reference
+     */
+    public SearchTerm(final byte[] bytes, final boolean defensiveCopy, final T 
reference) {
+        if (bytes == null || bytes.length == 0) {
+            throw new IllegalArgumentException();
+        }
+        if (defensiveCopy) {
+            this.bytes = Arrays.copyOf(bytes, bytes.length);
+        } else {
+            this.bytes = bytes;
+        }
+        this.hashCode = Arrays.hashCode(this.bytes);
+        this.reference = reference;
+    }
+
+    public int get(final int index) {
+        return bytes[index] & 0xff;
+    }
+
+    /**
+     * @return size in of search term in bytes
+     */
+    public int size() {
+        return bytes.length;
+    }
+
+    /**
+     * @return reference object for this given search term
+     */
+    public T getReference() {
+        return reference;
+    }
+
+    /**
+     * Determines if the given window starts with the same bytes as this term
+     *
+     * @param window Current window of bytes from the haystack being evaluated.
+     * @param windowLength The length of the window to consider
+     * @return true if this term starts with the same bytes of the given window
+     */
+    public boolean startsWith(byte[] window, int windowLength) {
+        if (windowLength > window.length) {
+            throw new IndexOutOfBoundsException();
+        }
+        if (bytes.length < windowLength) {
+            return false;
+        }
+        for (int i = 0; i < bytes.length && i < windowLength; i++) {
+            if (bytes[i] != window[i]) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * @return a defensive copy of the internal byte structure
+     */
+    public byte[] getBytes() {
+        return Arrays.copyOf(bytes, bytes.length);
+    }
+
+    @Override
+    public int hashCode() {
+        return hashCode;
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        final SearchTerm other = (SearchTerm) obj;
+        if (this.hashCode != other.hashCode) {
+            return false;
+        }
+        return Arrays.equals(this.bytes, other.bytes);
+    }
+
+    @Override
+    public String toString() {
+        return new String(bytes);
+    }
+
+    public String toString(final Charset charset) {
+        return new String(bytes, charset);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/AhoCorasick.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/AhoCorasick.java
 
b/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/AhoCorasick.java
new file mode 100644
index 0000000..3b8afaf
--- /dev/null
+++ 
b/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/AhoCorasick.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.search.ahocorasick;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Set;
+
+import org.apache.nifi.util.search.Search;
+import org.apache.nifi.util.search.SearchTerm;
+
+public class AhoCorasick<T> implements Search<T> {
+
+    private Node root = null;
+
+    /**
+     * Constructs a new search object.
+     *
+     * @throws IllegalArgumentException if given terms are null or empty
+     */
+    public AhoCorasick() {
+    }
+
+    @Override
+    public void initializeDictionary(final Set<SearchTerm<T>> terms) {
+        if (root != null) {
+            throw new IllegalStateException();
+        }
+        root = new Node();
+        if (terms == null || terms.isEmpty()) {
+            throw new IllegalArgumentException();
+        }
+        for (final SearchTerm<T> term : terms) {
+            int i = 0;
+            Node nextNode = root;
+            while (true) {
+                nextNode = addMatch(term, i, nextNode);
+                if (nextNode == null) {
+                    break; //we're done
+                }
+                i++;
+            }
+        }
+        initialize();
+    }
+
+    private Node addMatch(final SearchTerm<T> term, final int offset, final 
Node current) {
+        final int index = term.get(offset);
+        boolean atEnd = (offset == (term.size() - 1));
+        if (current.getNeighbor(index) == null) {
+            if (atEnd) {
+                current.setNeighbor(new Node(term), index);
+                return null;
+            }
+            current.setNeighbor(new Node(), index);
+        } else if (atEnd) {
+            current.getNeighbor(index).setMatchingTerm(term);
+            return null;
+        }
+        return current.getNeighbor(index);
+    }
+
+    private void initialize() {
+        //perform bgs to build failure links
+        final Queue<Node> queue = new LinkedList<>();
+        queue.add(root);
+        root.setFailureNode(null);
+        while (!queue.isEmpty()) {
+            final Node current = queue.poll();
+            for (int i = 0; i < 256; i++) {
+                final Node next = current.getNeighbor(i);
+                if (next != null) {
+                    //traverse failure to get state
+                    Node fail = current.getFailureNode();
+                    while ((fail != null) && fail.getNeighbor(i) == null) {
+                        fail = fail.getFailureNode();
+                    }
+                    if (fail != null) {
+                        next.setFailureNode(fail.getNeighbor(i));
+                    } else {
+                        next.setFailureNode(root);
+                    }
+                    queue.add(next);
+                }
+            }
+        }
+    }
+
+    @Override
+    public SearchState search(final InputStream stream, final boolean findAll) 
throws IOException {
+        return search(stream, findAll, null);
+    }
+
+    private SearchState search(final InputStream stream, final boolean 
findAll, final SearchState state) throws IOException {
+        if (root == null) {
+            throw new IllegalStateException();
+        }
+        final SearchState<T> currentState = (state == null) ? new 
SearchState(root) : state;
+        if (!findAll && currentState.foundMatch()) {
+            throw new IllegalStateException("A match has already been found 
yet we're being asked to keep searching");
+        }
+        Node current = currentState.getCurrentNode();
+        int currentChar;
+        while ((currentChar = stream.read()) >= 0) {
+            currentState.incrementBytesRead(1L);
+            Node next = current.getNeighbor(currentChar);
+            if (next == null) {
+                next = current.getFailureNode();
+                while ((next != null) && next.getNeighbor(currentChar) == 
null) {
+                    next = next.getFailureNode();
+                }
+                if (next != null) {
+                    next = next.getNeighbor(currentChar);
+                } else {
+                    next = root;
+                }
+            }
+            if (next == null) {
+                throw new IllegalStateException("tree out of sync");
+            }
+            //Accept condition
+            if (next.hasMatch()) {
+                currentState.addResult(next.getMatchingTerm());
+            }
+            for (Node failNode = next.getFailureNode(); failNode != null; 
failNode = failNode.getFailureNode()) {
+                if (failNode.hasMatch()) {
+                    currentState.addResult(failNode.getMatchingTerm());
+                }
+            }
+            current = next;
+            if (currentState.foundMatch() && !findAll) {
+                break;//give up as soon as we have at least one match
+            }
+        }
+        currentState.setCurrentNode(current);
+        return currentState;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/Node.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/Node.java
 
b/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/Node.java
new file mode 100644
index 0000000..0ac325c
--- /dev/null
+++ 
b/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/Node.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.search.ahocorasick;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nifi.util.search.SearchTerm;
+
+/**
+ *
+ * @author
+ */
+public class Node {
+
+    private final Map<Integer, Node> neighborMap;
+    private Node failureNode;
+    private SearchTerm<?> term;
+
+    Node(final SearchTerm<?> term) {
+        this();
+        this.term = term;
+    }
+
+    Node() {
+        neighborMap = new HashMap<>();
+        term = null;
+    }
+
+    void setFailureNode(final Node fail) {
+        failureNode = fail;
+    }
+
+    public Node getFailureNode() {
+        return failureNode;
+    }
+
+    public boolean hasMatch() {
+        return term != null;
+    }
+
+    void setMatchingTerm(final SearchTerm<?> term) {
+        this.term = term;
+    }
+
+    public SearchTerm<?> getMatchingTerm() {
+        return term;
+    }
+
+    public Node getNeighbor(final int index) {
+        return neighborMap.get(index);
+    }
+
+    void setNeighbor(final Node neighbor, final int index) {
+        neighborMap.put(index, neighbor);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java
 
b/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java
new file mode 100644
index 0000000..6d36ad0
--- /dev/null
+++ 
b/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.search.ahocorasick;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.util.search.SearchTerm;
+
+public class SearchState<T> {
+
+    private Node currentNode;
+    private final Map<SearchTerm<T>, List<Long>> resultMap;
+    private long bytesRead;
+
+    SearchState(final Node rootNode) {
+        resultMap = new HashMap<>(5);
+        currentNode = rootNode;
+        bytesRead = 0L;
+    }
+
+    void incrementBytesRead(final long increment) {
+        bytesRead += increment;
+    }
+
+    void setCurrentNode(final Node curr) {
+        currentNode = curr;
+    }
+
+    public Node getCurrentNode() {
+        return currentNode;
+    }
+
+    public Map<SearchTerm<T>, List<Long>> getResults() {
+        return new HashMap<>(resultMap);
+    }
+
+    void addResult(final SearchTerm matchingTerm) {
+        final List<Long> indexes = (resultMap.containsKey(matchingTerm)) ? 
resultMap.get(matchingTerm) : new ArrayList<Long>(5);
+        indexes.add(bytesRead);
+        resultMap.put(matchingTerm, indexes);
+    }
+
+    public boolean foundMatch() {
+        return !resultMap.isEmpty();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java
 
b/commons/nifi-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java
new file mode 100644
index 0000000..bd30a96
--- /dev/null
+++ 
b/commons/nifi-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+
+import org.apache.nifi.remote.io.CompressionInputStream;
+import org.apache.nifi.remote.io.CompressionOutputStream;
+
+import org.junit.Test;
+
+public class TestCompressionInputOutputStreams {
+
+    @Test
+    public void testSimple() throws IOException {
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        final byte[] data = "Hello, World!".getBytes("UTF-8");
+
+        final CompressionOutputStream cos = new CompressionOutputStream(baos);
+        cos.write(data);
+        cos.flush();
+        cos.close();
+
+        final byte[] compressedBytes = baos.toByteArray();
+        final CompressionInputStream cis = new CompressionInputStream(new 
ByteArrayInputStream(compressedBytes));
+        final byte[] decompressed = readFully(cis);
+
+        assertTrue(Arrays.equals(data, decompressed));
+    }
+
+    @Test
+    public void testDataLargerThanBuffer() throws IOException {
+        final String str = "The quick brown fox jumps over the lazy 
dog\r\n\n\n\r";
+
+        final StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < 100; i++) {
+            sb.append(str);
+        }
+        final byte[] data = sb.toString().getBytes("UTF-8");
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        final CompressionOutputStream cos = new CompressionOutputStream(baos, 
8192);
+        cos.write(data);
+        cos.flush();
+        cos.close();
+
+        final byte[] compressedBytes = baos.toByteArray();
+        final CompressionInputStream cis = new CompressionInputStream(new 
ByteArrayInputStream(compressedBytes));
+        final byte[] decompressed = readFully(cis);
+
+        assertTrue(Arrays.equals(data, decompressed));
+    }
+
+    @Test
+    public void testDataLargerThanBufferWhileFlushing() throws IOException {
+        final String str = "The quick brown fox jumps over the lazy 
dog\r\n\n\n\r";
+        final byte[] data = str.getBytes("UTF-8");
+
+        final StringBuilder sb = new StringBuilder();
+        final byte[] data1024;
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        final CompressionOutputStream cos = new CompressionOutputStream(baos, 
8192);
+        for (int i = 0; i < 1024; i++) {
+            cos.write(data);
+            cos.flush();
+            sb.append(str);
+        }
+        cos.close();
+        data1024 = sb.toString().getBytes("UTF-8");
+
+        final byte[] compressedBytes = baos.toByteArray();
+        final CompressionInputStream cis = new CompressionInputStream(new 
ByteArrayInputStream(compressedBytes));
+        final byte[] decompressed = readFully(cis);
+
+        assertTrue(Arrays.equals(data1024, decompressed));
+    }
+
+    @Test
+    public void testSendingMultipleFilesBackToBackOnSameStream() throws 
IOException {
+        final String str = "The quick brown fox jumps over the lazy 
dog\r\n\n\n\r";
+        final byte[] data = str.getBytes("UTF-8");
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        final CompressionOutputStream cos = new CompressionOutputStream(baos, 
8192);
+        for (int i = 0; i < 512; i++) {
+            cos.write(data);
+            cos.flush();
+        }
+        cos.close();
+
+        final CompressionOutputStream cos2 = new CompressionOutputStream(baos, 
8192);
+        for (int i = 0; i < 512; i++) {
+            cos2.write(data);
+            cos2.flush();
+        }
+        cos2.close();
+
+        final byte[] data512;
+        final StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < 512; i++) {
+            sb.append(str);
+        }
+        data512 = sb.toString().getBytes("UTF-8");
+
+        final byte[] compressedBytes = baos.toByteArray();
+        final ByteArrayInputStream bais = new 
ByteArrayInputStream(compressedBytes);
+
+        final CompressionInputStream cis = new CompressionInputStream(bais);
+        final byte[] decompressed = readFully(cis);
+        assertTrue(Arrays.equals(data512, decompressed));
+
+        final CompressionInputStream cis2 = new CompressionInputStream(bais);
+        final byte[] decompressed2 = readFully(cis2);
+        assertTrue(Arrays.equals(data512, decompressed2));
+    }
+
+    private byte[] readFully(final InputStream in) throws IOException {
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        final byte[] buffer = new byte[65536];
+        int len;
+        while ((len = in.read(buffer)) >= 0) {
+            baos.write(buffer, 0, len);
+        }
+
+        return baos.toByteArray();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java
 
b/commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java
new file mode 100644
index 0000000..52bd8de
--- /dev/null
+++ 
b/commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import org.apache.nifi.stream.io.ByteArrayInputStream;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
+import org.apache.nifi.stream.io.LeakyBucketStreamThrottler;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore("Tests are time-based")
+public class TestLeakyBucketThrottler {
+
+    @Test(timeout = 10000)
+    public void testOutputStreamInterface() throws IOException {
+        // throttle rate at 1 MB/sec
+        final LeakyBucketStreamThrottler throttler = new 
LeakyBucketStreamThrottler(1024 * 1024);
+
+        final byte[] data = new byte[1024 * 1024 * 4];
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        final OutputStream throttledOut = 
throttler.newThrottledOutputStream(baos);
+
+        final long start = System.currentTimeMillis();
+        throttledOut.write(data);
+        throttler.close();
+        final long millis = System.currentTimeMillis() - start;
+        // should take 4 sec give or take
+        assertTrue(millis > 3000);
+        assertTrue(millis < 6000);
+    }
+
+    @Test(timeout = 10000)
+    public void testInputStreamInterface() throws IOException {
+        // throttle rate at 1 MB/sec
+        final LeakyBucketStreamThrottler throttler = new 
LeakyBucketStreamThrottler(1024 * 1024);
+
+        final byte[] data = new byte[1024 * 1024 * 4];
+        final ByteArrayInputStream bais = new ByteArrayInputStream(data);
+        final InputStream throttledIn = 
throttler.newThrottledInputStream(bais);
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        final byte[] buffer = new byte[4096];
+        final long start = System.currentTimeMillis();
+        int len;
+        while ((len = throttledIn.read(buffer)) > 0) {
+            baos.write(buffer, 0, len);
+        }
+        throttler.close();
+        final long millis = System.currentTimeMillis() - start;
+        // should take 4 sec give or take
+        assertTrue(millis > 3000);
+        assertTrue(millis < 6000);
+        baos.close();
+    }
+
+    @Test(timeout = 10000)
+    public void testDirectInterface() throws IOException, InterruptedException 
{
+        // throttle rate at 1 MB/sec
+        final LeakyBucketStreamThrottler throttler = new 
LeakyBucketStreamThrottler(1024 * 1024);
+
+        // create 3 threads, each sending ~2 MB
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        final List<Thread> threads = new ArrayList<Thread>();
+        for (int i = 0; i < 3; i++) {
+            final Thread t = new WriterThread(i, throttler, baos);
+            threads.add(t);
+        }
+
+        final long start = System.currentTimeMillis();
+        for (final Thread t : threads) {
+            t.start();
+        }
+
+        for (final Thread t : threads) {
+            t.join();
+        }
+        final long elapsed = System.currentTimeMillis() - start;
+
+        throttler.close();
+
+        // To send 15 MB, it should have taken at least 5 seconds and no more 
than 7 seconds, to
+        // allow for busy-ness and the fact that we could write a tiny bit 
more than the limit.
+        assertTrue(elapsed > 5000);
+        assertTrue(elapsed < 7000);
+
+        // ensure bytes were copied out appropriately
+        assertEquals(3 * (2 * 1024 * 1024 + 1), baos.getBufferLength());
+        assertEquals((byte) 'A', 
baos.getUnderlyingBuffer()[baos.getBufferLength() - 1]);
+    }
+
+    private static class WriterThread extends Thread {
+
+        private final int idx;
+        private final byte[] data = new byte[1024 * 1024 * 2 + 1];
+        private final LeakyBucketStreamThrottler throttler;
+        private final OutputStream out;
+
+        public WriterThread(final int idx, final LeakyBucketStreamThrottler 
throttler, final OutputStream out) {
+            this.idx = idx;
+            this.throttler = throttler;
+            this.out = out;
+            this.data[this.data.length - 1] = (byte) 'A';
+        }
+
+        @Override
+        public void run() {
+            long startMillis = System.currentTimeMillis();
+            long bytesWritten = 0L;
+            try {
+                throttler.copy(new ByteArrayInputStream(data), out);
+            } catch (IOException e) {
+                e.printStackTrace();
+                return;
+            }
+            long now = System.currentTimeMillis();
+            long millisElapsed = now - startMillis;
+            bytesWritten += data.length;
+            float bytesPerSec = (float) bytesWritten / (float) millisElapsed * 
1000F;
+            System.out.println(idx + " : copied data at a rate of " + 
bytesPerSec + " bytes/sec");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/test/java/org/apache/nifi/util/TestNaiveSearchRingBuffer.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-utils/src/test/java/org/apache/nifi/util/TestNaiveSearchRingBuffer.java
 
b/commons/nifi-utils/src/test/java/org/apache/nifi/util/TestNaiveSearchRingBuffer.java
new file mode 100644
index 0000000..0838e96
--- /dev/null
+++ 
b/commons/nifi-utils/src/test/java/org/apache/nifi/util/TestNaiveSearchRingBuffer.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+public class TestNaiveSearchRingBuffer {
+
+    @Test
+    public void testAddAndCompare() {
+        final byte[] pattern = new byte[]{
+            '\r', '0', 38, 48
+        };
+
+        final byte[] search = new byte[]{
+            '\r', '0', 38, 58, 58, 83, 78, '\r', '0', 38, 48, 83, 92, 78, 4, 38
+        };
+
+        final NaiveSearchRingBuffer circ = new NaiveSearchRingBuffer(pattern);
+        int counter = -1;
+        for (final byte b : search) {
+            counter++;
+            final boolean matched = circ.addAndCompare(b);
+            if (counter == 10) {
+                assertTrue(matched);
+            } else {
+                assertFalse(matched);
+            }
+        }
+    }
+
+    @Test
+    public void testGetOldestByte() {
+        final byte[] pattern = new byte[]{
+            '\r', '0', 38, 48
+        };
+
+        final byte[] search = new byte[]{
+            '\r', '0', 38, 58, 58, 83, 78, (byte) 223, (byte) 227, (byte) 250, 
'\r', '0', 38, 48, 83, 92, 78, 4, 38
+        };
+
+        final NaiveSearchRingBuffer circ = new NaiveSearchRingBuffer(pattern);
+        int counter = -1;
+        for (final byte b : search) {
+            counter++;
+            final boolean matched = circ.addAndCompare(b);
+            if (counter == 13) {
+                assertTrue(matched);
+            } else {
+                assertFalse(matched);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/test/java/org/apache/nifi/util/file/monitor/TestCompoundUpdateMonitor.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-utils/src/test/java/org/apache/nifi/util/file/monitor/TestCompoundUpdateMonitor.java
 
b/commons/nifi-utils/src/test/java/org/apache/nifi/util/file/monitor/TestCompoundUpdateMonitor.java
new file mode 100644
index 0000000..f576e94
--- /dev/null
+++ 
b/commons/nifi-utils/src/test/java/org/apache/nifi/util/file/monitor/TestCompoundUpdateMonitor.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.file.monitor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Path;
+import java.util.UUID;
+
+
+import org.junit.Test;
+
+public class TestCompoundUpdateMonitor {
+
+    @Test
+    public void test() throws IOException {
+        final UpdateMonitor lastModified = new LastModifiedMonitor();
+        final MD5SumMonitor md5 = new MD5SumMonitor();
+        final CompoundUpdateMonitor compound = new 
CompoundUpdateMonitor(lastModified, md5);
+
+        final File file = new File("target/" + UUID.randomUUID().toString());
+        if (file.exists()) {
+            assertTrue(file.delete());
+        }
+        assertTrue(file.createNewFile());
+
+        final Path path = file.toPath();
+
+        final Object curState = compound.getCurrentState(path);
+        final Object state2 = compound.getCurrentState(path);
+
+        assertEquals(curState, state2);
+        file.setLastModified(System.currentTimeMillis() + 1000L);
+        final Object state3 = compound.getCurrentState(path);
+        assertEquals(state2, state3);
+
+        final Object state4 = compound.getCurrentState(path);
+        assertEquals(state3, state4);
+
+        final long lastModifiedDate = file.lastModified();
+        try (final OutputStream out = new FileOutputStream(file)) {
+            out.write("Hello".getBytes("UTF-8"));
+        }
+
+        file.setLastModified(lastModifiedDate);
+
+        final Object state5 = compound.getCurrentState(path);
+        assertNotSame(state4, state5);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/test/java/org/apache/nifi/util/file/monitor/TestSynchronousFileWatcher.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-utils/src/test/java/org/apache/nifi/util/file/monitor/TestSynchronousFileWatcher.java
 
b/commons/nifi-utils/src/test/java/org/apache/nifi/util/file/monitor/TestSynchronousFileWatcher.java
new file mode 100644
index 0000000..7125581
--- /dev/null
+++ 
b/commons/nifi-utils/src/test/java/org/apache/nifi/util/file/monitor/TestSynchronousFileWatcher.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.file.monitor;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+
+import org.junit.Test;
+
+
+public class TestSynchronousFileWatcher {
+
+    @Test
+    public void testIt() throws UnsupportedEncodingException, IOException, 
InterruptedException {
+        final Path path = Paths.get("target/1.txt");
+        Files.copy(new ByteArrayInputStream("Hello, 
World!".getBytes("UTF-8")), path, StandardCopyOption.REPLACE_EXISTING);
+        final UpdateMonitor monitor = new MD5SumMonitor();
+
+        final SynchronousFileWatcher watcher = new 
SynchronousFileWatcher(path, monitor, 10L);
+        assertFalse(watcher.checkAndReset());
+        Thread.sleep(30L);
+        assertFalse(watcher.checkAndReset());
+
+        final FileOutputStream fos = new FileOutputStream(path.toFile());
+        try {
+            fos.write("Good-bye, World!".getBytes("UTF-8"));
+            fos.getFD().sync();
+        } finally {
+            fos.close();
+        }
+
+        assertTrue(watcher.checkAndReset());
+        assertFalse(watcher.checkAndReset());
+
+        Thread.sleep(30L);
+        assertFalse(watcher.checkAndReset());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestCompoundUpdateMonitor.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestCompoundUpdateMonitor.java
 
b/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestCompoundUpdateMonitor.java
deleted file mode 100644
index c796a96..0000000
--- 
a/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestCompoundUpdateMonitor.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.timebuffer;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.file.Path;
-import java.util.UUID;
-
-import org.apache.nifi.io.CompoundUpdateMonitor;
-import org.apache.nifi.io.LastModifiedMonitor;
-import org.apache.nifi.io.MD5SumMonitor;
-import org.apache.nifi.io.UpdateMonitor;
-
-import org.junit.Test;
-
-public class TestCompoundUpdateMonitor {
-
-    @Test
-    public void test() throws IOException {
-        final UpdateMonitor lastModified = new LastModifiedMonitor();
-        final MD5SumMonitor md5 = new MD5SumMonitor();
-        final CompoundUpdateMonitor compound = new 
CompoundUpdateMonitor(lastModified, md5);
-
-        final File file = new File("target/" + UUID.randomUUID().toString());
-        if (file.exists()) {
-            assertTrue(file.delete());
-        }
-        assertTrue(file.createNewFile());
-
-        final Path path = file.toPath();
-
-        final Object curState = compound.getCurrentState(path);
-        final Object state2 = compound.getCurrentState(path);
-
-        assertEquals(curState, state2);
-        file.setLastModified(System.currentTimeMillis() + 1000L);
-        final Object state3 = compound.getCurrentState(path);
-        assertEquals(state2, state3);
-
-        final Object state4 = compound.getCurrentState(path);
-        assertEquals(state3, state4);
-
-        final long lastModifiedDate = file.lastModified();
-        try (final OutputStream out = new FileOutputStream(file)) {
-            out.write("Hello".getBytes("UTF-8"));
-        }
-
-        file.setLastModified(lastModifiedDate);
-
-        final Object state5 = compound.getCurrentState(path);
-        assertNotSame(state4, state5);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestSynchronousFileWatcher.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestSynchronousFileWatcher.java
 
b/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestSynchronousFileWatcher.java
deleted file mode 100644
index 4b2c0d5..0000000
--- 
a/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestSynchronousFileWatcher.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.timebuffer;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.ByteArrayInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.StandardCopyOption;
-
-import org.junit.Test;
-
-import org.apache.nifi.io.MD5SumMonitor;
-import org.apache.nifi.io.SynchronousFileWatcher;
-import org.apache.nifi.io.UpdateMonitor;
-
-public class TestSynchronousFileWatcher {
-
-    @Test
-    public void testIt() throws UnsupportedEncodingException, IOException, 
InterruptedException {
-        final Path path = Paths.get("target/1.txt");
-        Files.copy(new ByteArrayInputStream("Hello, 
World!".getBytes("UTF-8")), path, StandardCopyOption.REPLACE_EXISTING);
-        final UpdateMonitor monitor = new MD5SumMonitor();
-
-        final SynchronousFileWatcher watcher = new 
SynchronousFileWatcher(path, monitor, 10L);
-        assertFalse(watcher.checkAndReset());
-        Thread.sleep(30L);
-        assertFalse(watcher.checkAndReset());
-
-        final FileOutputStream fos = new FileOutputStream(path.toFile());
-        try {
-            fos.write("Good-bye, World!".getBytes("UTF-8"));
-            fos.getFD().sync();
-        } finally {
-            fos.close();
-        }
-
-        assertTrue(watcher.checkAndReset());
-        assertFalse(watcher.checkAndReset());
-
-        Thread.sleep(30L);
-        assertFalse(watcher.checkAndReset());
-    }
-}

Reply via email to