http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java new file mode 100644 index 0000000..41a0557 --- /dev/null +++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java @@ -0,0 +1,623 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.file; + +import java.io.BufferedInputStream; +import java.io.Closeable; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.FilenameFilter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.file.Files; +import java.nio.file.Path; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Random; + +import org.slf4j.Logger; + +/** + * A utility class containing a few useful static methods to do typical IO + * operations. + * + * @author unattributed + */ +public class FileUtils { + + public static final long TRANSFER_CHUNK_SIZE_BYTES = 1024 * 1024 * 8; //8 MB chunks + public static final long MILLIS_BETWEEN_ATTEMPTS = 50L; + + /** + * Closes the given closeable quietly - no logging, no exceptions... + * + * @param closeable + */ + public static void closeQuietly(final Closeable closeable) { + if (null != closeable) { + try { + closeable.close(); + } catch (final IOException io) {/*IGNORE*/ + + } + } + } + + /** + * Releases the given lock quietly - no logging, no exception + * + * @param lock + */ + public static void releaseQuietly(final FileLock lock) { + if (null != lock) { + try { + lock.release(); + } catch (final IOException io) { + /*IGNORE*/ + } + } + } + + public static void ensureDirectoryExistAndCanAccess(final File dir) throws IOException { + if (dir.exists() && !dir.isDirectory()) { + throw new IOException(dir.getAbsolutePath() + " is not a directory"); + } else if (!dir.exists()) { + final boolean made = dir.mkdirs(); + if (!made) { + throw new IOException(dir.getAbsolutePath() + " could not be created"); + } + } + if (!(dir.canRead() && dir.canWrite())) { + throw new IOException(dir.getAbsolutePath() + " directory does not have read/write privilege"); + } + } + + /** + * Deletes the given file. If the given file exists but could not be deleted + * this will be printed as a warning to the given logger + * + * @param file + * @param logger + * @return + */ + public static boolean deleteFile(final File file, final Logger logger) { + return FileUtils.deleteFile(file, logger, 1); + } + + /** + * Deletes the given file. If the given file exists but could not be deleted + * this will be printed as a warning to the given logger + * + * @param file + * @param logger + * @param attempts indicates how many times an attempt to delete should be + * made + * @return true if given file no longer exists + */ + public static boolean deleteFile(final File file, final Logger logger, final int attempts) { + if (file == null) { + return false; + } + boolean isGone = false; + try { + if (file.exists()) { + final int effectiveAttempts = Math.max(1, attempts); + for (int i = 0; i < effectiveAttempts && !isGone; i++) { + isGone = file.delete() || !file.exists(); + if (!isGone && (effectiveAttempts - i) > 1) { + FileUtils.sleepQuietly(MILLIS_BETWEEN_ATTEMPTS); + } + } + if (!isGone && logger != null) { + logger.warn("File appears to exist but unable to delete file: " + file.getAbsolutePath()); + } + } + } catch (final Throwable t) { + if (logger != null) { + logger.warn("Unable to delete file: '" + file.getAbsolutePath() + "' due to " + t); + } + } + return isGone; + } + + /** + * Deletes all of the given files. If any exist and cannot be deleted that + * will be printed at warn to the given logger. + * + * @param files can be null + * @param logger can be null + */ + public static void deleteFile(final List<File> files, final Logger logger) { + FileUtils.deleteFile(files, logger, 1); + } + + /** + * Deletes all of the given files. If any exist and cannot be deleted that + * will be printed at warn to the given logger. + * + * @param files can be null + * @param logger can be null + * @param attempts indicates how many times an attempt should be made to + * delete each file + */ + public static void deleteFile(final List<File> files, final Logger logger, final int attempts) { + if (null == files || files.isEmpty()) { + return; + } + final int effectiveAttempts = Math.max(1, attempts); + for (final File file : files) { + try { + boolean isGone = false; + for (int i = 0; i < effectiveAttempts && !isGone; i++) { + isGone = file.delete() || !file.exists(); + if (!isGone && (effectiveAttempts - i) > 1) { + FileUtils.sleepQuietly(MILLIS_BETWEEN_ATTEMPTS); + } + } + if (!isGone && logger != null) { + logger.warn("File appears to exist but unable to delete file: " + file.getAbsolutePath()); + } + } catch (final Throwable t) { + if (null != logger) { + logger.warn("Unable to delete file given from path: '" + file.getPath() + "' due to " + t); + } + } + } + } + + /** + * Deletes all files (not directories..) in the given directory (non + * recursive) that match the given filename filter. If any file cannot be + * deleted then this is printed at warn to the given logger. + * + * @param directory + * @param filter if null then no filter is used + * @param logger + */ + public static void deleteFilesInDir(final File directory, final FilenameFilter filter, final Logger logger) { + FileUtils.deleteFilesInDir(directory, filter, logger, false); + } + + /** + * Deletes all files (not directories) in the given directory (recursive) + * that match the given filename filter. If any file cannot be deleted then + * this is printed at warn to the given logger. + * + * @param directory + * @param filter if null then no filter is used + * @param logger + * @param recurse + */ + public static void deleteFilesInDir(final File directory, final FilenameFilter filter, final Logger logger, final boolean recurse) { + FileUtils.deleteFilesInDir(directory, filter, logger, recurse, false); + } + + /** + * Deletes all files (not directories) in the given directory (recursive) + * that match the given filename filter. If any file cannot be deleted then + * this is printed at warn to the given logger. + * + * @param directory + * @param filter if null then no filter is used + * @param logger + * @param recurse + * @param deleteEmptyDirectories default is false; if true will delete + * directories found that are empty + */ + public static void deleteFilesInDir(final File directory, final FilenameFilter filter, final Logger logger, final boolean recurse, final boolean deleteEmptyDirectories) { + // ensure the specified directory is actually a directory and that it exists + if (null != directory && directory.isDirectory()) { + final File ingestFiles[] = directory.listFiles(); + for (File ingestFile : ingestFiles) { + boolean process = (filter == null) ? true : filter.accept(directory, ingestFile.getName()); + if (ingestFile.isFile() && process) { + FileUtils.deleteFile(ingestFile, logger, 3); + } + if (ingestFile.isDirectory() && recurse) { + FileUtils.deleteFilesInDir(ingestFile, filter, logger, recurse, deleteEmptyDirectories); + if (deleteEmptyDirectories && ingestFile.list().length == 0) { + FileUtils.deleteFile(ingestFile, logger, 3); + } + } + } + } + } + + /** + * Deletes given files. + * + * @param files + * @param recurse will recurse + * @throws IOException + */ + public static void deleteFiles(final Collection<File> files, final boolean recurse) throws IOException { + for (final File file : files) { + FileUtils.deleteFile(file, recurse); + } + } + + public static void deleteFile(final File file, final boolean recurse) throws IOException { + if (file.isDirectory() && recurse) { + FileUtils.deleteFiles(Arrays.asList(file.listFiles()), recurse); + } + //now delete the file itself regardless of whether it is plain file or a directory + if (!FileUtils.deleteFile(file, null, 5)) { + throw new IOException("Unable to delete " + file.getAbsolutePath()); + } + } + + /** + * Randomly generates a sequence of bytes and overwrites the contents of the + * file a number of times. The file is then deleted. + * + * @param file File to be overwritten a number of times and, ultimately, + * deleted + * @param passes Number of times file should be overwritten + * @throws IOException if something makes shredding or deleting a problem + */ + public static void shredFile(final File file, final int passes) + throws IOException { + final Random generator = new Random(); + final long fileLength = file.length(); + final int byteArraySize = (int) Math.min(fileLength, 1048576); // 1MB + final byte[] b = new byte[byteArraySize]; + final long numOfRandomWrites = (fileLength / b.length) + 1; + final FileOutputStream fos = new FileOutputStream(file); + try { + // Over write file contents (passes) times + final FileChannel channel = fos.getChannel(); + for (int i = 0; i < passes; i++) { + generator.nextBytes(b); + for (int j = 0; j <= numOfRandomWrites; j++) { + fos.write(b); + } + fos.flush(); + channel.position(0); + } + // Write out "0" for each byte in the file + Arrays.fill(b, (byte) 0); + for (int j = 0; j < numOfRandomWrites; j++) { + fos.write(b); + } + fos.flush(); + fos.close(); + // Try to delete the file a few times + if (!FileUtils.deleteFile(file, null, 5)) { + throw new IOException("Failed to delete file after shredding"); + } + + } finally { + FileUtils.closeQuietly(fos); + } + } + + public static long copy(final InputStream in, final OutputStream out) throws IOException { + final byte[] buffer = new byte[65536]; + long copied = 0L; + int len; + while ((len = in.read(buffer)) > 0) { + out.write(buffer, 0, len); + copied += len; + } + + return copied; + } + + public static long copyBytes(final byte[] bytes, final File destination, final boolean lockOutputFile) throws FileNotFoundException, IOException { + FileOutputStream fos = null; + FileLock outLock = null; + long fileSize = 0L; + try { + fos = new FileOutputStream(destination); + final FileChannel out = fos.getChannel(); + if (lockOutputFile) { + outLock = out.tryLock(0, Long.MAX_VALUE, false); + if (null == outLock) { + throw new IOException("Unable to obtain exclusive file lock for: " + destination.getAbsolutePath()); + } + } + fos.write(bytes); + fos.flush(); + fileSize = bytes.length; + } finally { + FileUtils.releaseQuietly(outLock); + FileUtils.closeQuietly(fos); + } + return fileSize; + } + + /** + * Copies the given source file to the given destination file. The given + * destination will be overwritten if it already exists. + * + * @param source + * @param destination + * @param lockInputFile if true will lock input file during copy; if false + * will not + * @param lockOutputFile if true will lock output file during copy; if false + * will not + * @param move if true will perform what is effectively a move operation + * rather than a pure copy. This allows for potentially highly efficient + * movement of the file but if not possible this will revert to a copy then + * delete behavior. If false, then the file is copied and the source file is + * retained. If a true rename/move occurs then no lock is held during that + * time. + * @param logger if failures occur, they will be logged to this logger if + * possible. If this logger is null, an IOException will instead be thrown, + * indicating the problem. + * @return long number of bytes copied + * @throws FileNotFoundException if the source file could not be found + * @throws IOException + * @throws SecurityException if a security manager denies the needed file + * operations + */ + public static long copyFile(final File source, final File destination, final boolean lockInputFile, final boolean lockOutputFile, final boolean move, final Logger logger) throws FileNotFoundException, IOException { + + FileInputStream fis = null; + FileOutputStream fos = null; + FileLock inLock = null; + FileLock outLock = null; + long fileSize = 0L; + if (!source.canRead()) { + throw new IOException("Must at least have read permission"); + + } + if (move && source.renameTo(destination)) { + fileSize = destination.length(); + } else { + try { + fis = new FileInputStream(source); + fos = new FileOutputStream(destination); + final FileChannel in = fis.getChannel(); + final FileChannel out = fos.getChannel(); + if (lockInputFile) { + inLock = in.tryLock(0, Long.MAX_VALUE, true); + if (null == inLock) { + throw new IOException("Unable to obtain shared file lock for: " + source.getAbsolutePath()); + } + } + if (lockOutputFile) { + outLock = out.tryLock(0, Long.MAX_VALUE, false); + if (null == outLock) { + throw new IOException("Unable to obtain exclusive file lock for: " + destination.getAbsolutePath()); + } + } + long bytesWritten = 0; + do { + bytesWritten += out.transferFrom(in, bytesWritten, TRANSFER_CHUNK_SIZE_BYTES); + fileSize = in.size(); + } while (bytesWritten < fileSize); + out.force(false); + FileUtils.closeQuietly(fos); + FileUtils.closeQuietly(fis); + fos = null; + fis = null; + if (move && !FileUtils.deleteFile(source, null, 5)) { + if (logger == null) { + FileUtils.deleteFile(destination, null, 5); + throw new IOException("Could not remove file " + source.getAbsolutePath()); + } else { + logger.warn("Configured to delete source file when renaming/move not successful. However, unable to delete file at: " + source.getAbsolutePath()); + } + } + } finally { + FileUtils.releaseQuietly(inLock); + FileUtils.releaseQuietly(outLock); + FileUtils.closeQuietly(fos); + FileUtils.closeQuietly(fis); + } + } + return fileSize; + } + + /** + * Copies the given source file to the given destination file. The given + * destination will be overwritten if it already exists. + * + * @param source + * @param destination + * @param lockInputFile if true will lock input file during copy; if false + * will not + * @param lockOutputFile if true will lock output file during copy; if false + * will not + * @param logger + * @return long number of bytes copied + * @throws FileNotFoundException if the source file could not be found + * @throws IOException + * @throws SecurityException if a security manager denies the needed file + * operations + */ + public static long copyFile(final File source, final File destination, final boolean lockInputFile, final boolean lockOutputFile, final Logger logger) throws FileNotFoundException, IOException { + return FileUtils.copyFile(source, destination, lockInputFile, lockOutputFile, false, logger); + } + + public static long copyFile(final File source, final OutputStream stream, final boolean closeOutputStream, final boolean lockInputFile) throws FileNotFoundException, IOException { + FileInputStream fis = null; + FileLock inLock = null; + long fileSize = 0L; + try { + fis = new FileInputStream(source); + final FileChannel in = fis.getChannel(); + if (lockInputFile) { + inLock = in.tryLock(0, Long.MAX_VALUE, true); + if (inLock == null) { + throw new IOException("Unable to obtain exclusive file lock for: " + source.getAbsolutePath()); + } + + } + + byte[] buffer = new byte[1 << 18]; //256 KB + int bytesRead = -1; + while ((bytesRead = fis.read(buffer)) != -1) { + stream.write(buffer, 0, bytesRead); + } + in.force(false); + stream.flush(); + fileSize = in.size(); + } finally { + FileUtils.releaseQuietly(inLock); + FileUtils.closeQuietly(fis); + if (closeOutputStream) { + FileUtils.closeQuietly(stream); + } + } + return fileSize; + } + + public static long copyFile(final InputStream stream, final File destination, final boolean closeInputStream, final boolean lockOutputFile) throws FileNotFoundException, IOException { + final Path destPath = destination.toPath(); + final long size = Files.copy(stream, destPath); + if (closeInputStream) { + stream.close(); + } + return size; + } + + /** + * Renames the given file from the source path to the destination path. This + * handles multiple attempts. This should only be used to rename within a + * given directory. Renaming across directories might not work well. See the + * <code>File.renameTo</code> for more information. + * + * @param source the file to rename + * @param destination the file path to rename to + * @param maxAttempts the max number of attempts to attempt the rename + * @throws IOException if rename isn't successful + */ + public static void renameFile(final File source, final File destination, final int maxAttempts) throws IOException { + FileUtils.renameFile(source, destination, maxAttempts, false); + } + + /** + * Renames the given file from the source path to the destination path. This + * handles multiple attempts. This should only be used to rename within a + * given directory. Renaming across directories might not work well. See the + * <code>File.renameTo</code> for more information. + * + * @param source the file to rename + * @param destination the file path to rename to + * @param maxAttempts the max number of attempts to attempt the rename + * @param replace if true and a rename attempt fails will check if a file is + * already at the destination path. If so it will delete that file and + * attempt the rename according the remaining maxAttempts. If false, any + * conflicting files will be left as they were and the rename attempts will + * fail if conflicting. + * @throws IOException if rename isn't successful + */ + public static void renameFile(final File source, final File destination, final int maxAttempts, final boolean replace) throws IOException { + final int attempts = (replace || maxAttempts < 1) ? Math.max(2, maxAttempts) : maxAttempts; + boolean renamed = false; + for (int i = 0; i < attempts; i++) { + renamed = source.renameTo(destination); + if (!renamed) { + FileUtils.deleteFile(destination, null, 5); + } else { + break; //rename has succeeded + } + } + if (!renamed) { + throw new IOException("Attempted " + maxAttempts + " times but unable to rename from \'" + source.getPath() + "\' to \'" + destination.getPath() + "\'"); + + } + } + + public static void sleepQuietly(final long millis) { + try { + Thread.sleep(millis); + } catch (final InterruptedException ex) { + /* do nothing */ + } + } + + /** + * Syncs a primary copy of a file with the copy in the restore directory. If + * the restore directory does not have a file and the primary has a file, + * the the primary's file is copied to the restore directory. Else if the + * restore directory has a file, but the primary does not, then the + * restore's file is copied to the primary directory. Else if the primary + * file is different than the restore file, then an IllegalStateException is + * thrown. Otherwise, if neither file exists, then no syncing is performed. + * + * @param primaryFile the primary file + * @param restoreFile the restore file + * @param logger a logger + * @throws IOException if an I/O problem was encountered during syncing + * @throws IllegalStateException if the primary and restore copies exist but + * are different + */ + public static void syncWithRestore(final File primaryFile, final File restoreFile, final Logger logger) + throws IOException { + + if (primaryFile.exists() && !restoreFile.exists()) { + // copy primary file to restore + copyFile(primaryFile, restoreFile, false, false, logger); + } else if (restoreFile.exists() && !primaryFile.exists()) { + // copy restore file to primary + copyFile(restoreFile, primaryFile, false, false, logger); + } else if (primaryFile.exists() && restoreFile.exists() && !isSame(primaryFile, restoreFile)) { + throw new IllegalStateException(String.format("Primary file '%s' is different than restore file '%s'", + primaryFile.getAbsoluteFile(), restoreFile.getAbsolutePath())); + } + } + + /** + * Returns true if the given files are the same according to their MD5 hash. + * + * @param file1 a file + * @param file2 a file + * @return true if the files are the same; false otherwise + * @throws IOException if the MD5 hash could not be computed + */ + public static boolean isSame(final File file1, final File file2) throws IOException { + return Arrays.equals(computeMd5Digest(file1), computeMd5Digest(file2)); + } + + /** + * Returns the MD5 hash of the given file. + * + * @param file a file + * @return the MD5 hash + * @throws IOException if the MD5 hash could not be computed + */ + public static byte[] computeMd5Digest(final File file) throws IOException { + final MessageDigest digest; + try { + digest = MessageDigest.getInstance("MD5"); + } catch (final NoSuchAlgorithmException nsae) { + throw new IOException(nsae); + } + + try (final FileInputStream fis = new FileInputStream(file)) { + int len; + final byte[] buffer = new byte[8192]; + while ((len = fis.read(buffer)) > -1) { + if (len > 0) { + digest.update(buffer, 0, len); + } + } + } + return digest.digest(); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/CompoundUpdateMonitor.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/CompoundUpdateMonitor.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/CompoundUpdateMonitor.java new file mode 100644 index 0000000..6f9c616 --- /dev/null +++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/CompoundUpdateMonitor.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.file.monitor; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +/** + * An {@link UpdateMonitor} that combines multiple <code>UpdateMonitor</code>s + * such that it will indicate a change in a file only if ALL sub-monitors + * indicate a change. The sub-monitors will be applied in the order given and if + * any indicates that the state has not changed, the subsequent sub-monitors may + * not be given a chance to run + */ +public class CompoundUpdateMonitor implements UpdateMonitor { + + private final List<UpdateMonitor> monitors; + + public CompoundUpdateMonitor(final UpdateMonitor first, final UpdateMonitor... others) { + monitors = new ArrayList<>(); + monitors.add(first); + for (final UpdateMonitor monitor : others) { + monitors.add(monitor); + } + } + + @Override + public Object getCurrentState(final Path path) throws IOException { + return new DeferredMonitorAction(monitors, path); + } + + private static class DeferredMonitorAction { + + private static final Object NON_COMPUTED_VALUE = new Object(); + + private final List<UpdateMonitor> monitors; + private final Path path; + + private final Object[] preCalculated; + + public DeferredMonitorAction(final List<UpdateMonitor> monitors, final Path path) { + this.monitors = monitors; + this.path = path; + preCalculated = new Object[monitors.size()]; + + for (int i = 0; i < preCalculated.length; i++) { + preCalculated[i] = NON_COMPUTED_VALUE; + } + } + + private Object getCalculatedValue(final int i) throws IOException { + if (preCalculated[i] == NON_COMPUTED_VALUE) { + preCalculated[i] = monitors.get(i).getCurrentState(path); + } + + return preCalculated[i]; + } + + @Override + public boolean equals(final Object obj) { + // must return true unless ALL DeferredMonitorAction's indicate that they are different + if (obj == null) { + return false; + } + + if (!(obj instanceof DeferredMonitorAction)) { + return false; + } + + final DeferredMonitorAction other = (DeferredMonitorAction) obj; + try { + // Go through each UpdateMonitor's value and check if the value has changed. + for (int i = 0; i < preCalculated.length; i++) { + final Object mine = getCalculatedValue(i); + final Object theirs = other.getCalculatedValue(i); + + if (mine == theirs) { + // same + return true; + } + + if (mine == null && theirs == null) { + // same + return true; + } + + if (mine.equals(theirs)) { + return true; + } + } + } catch (final IOException e) { + return false; + } + + // No DeferredMonitorAction was the same as last time. Therefore, it's not equal + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/LastModifiedMonitor.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/LastModifiedMonitor.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/LastModifiedMonitor.java new file mode 100644 index 0000000..e6be558 --- /dev/null +++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/LastModifiedMonitor.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.file.monitor; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +public class LastModifiedMonitor implements UpdateMonitor { + + @Override + public Object getCurrentState(final Path path) throws IOException { + return Files.getLastModifiedTime(path); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/MD5SumMonitor.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/MD5SumMonitor.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/MD5SumMonitor.java new file mode 100644 index 0000000..8dea4bf --- /dev/null +++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/MD5SumMonitor.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.file.monitor; + +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; + +public class MD5SumMonitor implements UpdateMonitor { + + @Override + public Object getCurrentState(final Path path) throws IOException { + final MessageDigest digest; + try { + digest = MessageDigest.getInstance("MD5"); + } catch (final NoSuchAlgorithmException nsae) { + throw new AssertionError(nsae); + } + + try (final FileInputStream fis = new FileInputStream(path.toFile())) { + int len; + final byte[] buffer = new byte[8192]; + while ((len = fis.read(buffer)) > -1) { + if (len > 0) { + digest.update(buffer, 0, len); + } + } + } + + // Return a ByteBuffer instead of byte[] because we want equals() to do a deep equality + return ByteBuffer.wrap(digest.digest()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/SynchronousFileWatcher.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/SynchronousFileWatcher.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/SynchronousFileWatcher.java new file mode 100644 index 0000000..e0089c1 --- /dev/null +++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/SynchronousFileWatcher.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.file.monitor; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Allows the user to configure a {@link java.nio.file.Path Path} to watch for + * modifications and periodically poll to check if the file has been modified + */ +public class SynchronousFileWatcher { + + private final Path path; + private final long checkUpdateMillis; + private final UpdateMonitor monitor; + private final AtomicReference<StateWrapper> lastState; + private final Lock resourceLock = new ReentrantLock(); + + public SynchronousFileWatcher(final Path path, final UpdateMonitor monitor) { + this(path, monitor, 0L); + } + + public SynchronousFileWatcher(final Path path, final UpdateMonitor monitor, final long checkMillis) { + if (checkMillis < 0) { + throw new IllegalArgumentException(); + } + + this.path = path; + checkUpdateMillis = checkMillis; + this.monitor = monitor; + + Object currentState; + try { + currentState = monitor.getCurrentState(path); + } catch (final IOException e) { + currentState = null; + } + + this.lastState = new AtomicReference<>(new StateWrapper(currentState)); + } + + /** + * Checks if the file has been updated according to the configured + * {@link UpdateMonitor} and resets the state + * + * @return + * @throws IOException + */ + public boolean checkAndReset() throws IOException { + if (checkUpdateMillis <= 0) { // if checkUpdateMillis <= 0, always check + return checkForUpdate(); + } else { + final StateWrapper stateWrapper = lastState.get(); + if (stateWrapper.getTimestamp() < System.currentTimeMillis() - checkUpdateMillis) { + return checkForUpdate(); + } + return false; + } + } + + private boolean checkForUpdate() throws IOException { + if (resourceLock.tryLock()) { + try { + final StateWrapper wrapper = lastState.get(); + final Object newState = monitor.getCurrentState(path); + if (newState == null && wrapper.getState() == null) { + return false; + } + if (newState == null || wrapper.getState() == null) { + lastState.set(new StateWrapper(newState)); + return true; + } + + final boolean unmodified = newState.equals(wrapper.getState()); + if (!unmodified) { + lastState.set(new StateWrapper(newState)); + } + return !unmodified; + } finally { + resourceLock.unlock(); + } + } else { + return false; + } + } + + private static class StateWrapper { + + private final Object state; + private final long timestamp; + + public StateWrapper(final Object state) { + this.state = state; + this.timestamp = System.currentTimeMillis(); + } + + public Object getState() { + return state; + } + + public long getTimestamp() { + return timestamp; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/UpdateMonitor.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/UpdateMonitor.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/UpdateMonitor.java new file mode 100644 index 0000000..20ed1dd --- /dev/null +++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/UpdateMonitor.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.file.monitor; + +import java.io.IOException; +import java.nio.file.Path; + +public interface UpdateMonitor { + + Object getCurrentState(Path path) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/Search.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/Search.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/Search.java new file mode 100644 index 0000000..59b444a --- /dev/null +++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/Search.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.search; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Set; + +import org.apache.nifi.util.search.ahocorasick.SearchState; + +/** + * Defines an interface to search for content given a set of search terms. Any + * implementation of search must be thread safe. + * + * @author + * @param <T> + */ +public interface Search<T> { + + /** + * Establishes the dictionary of terms which will be searched in subsequent + * search calls. This can be called only once + * + * @param terms + */ + void initializeDictionary(Set<SearchTerm<T>> terms); + + /** + * Searches the given input stream for matches between the already specified + * dictionary and the contents scanned. + * + * @param haystack + * @param findAll if true will find all matches if false will find only the + * first match + * @return SearchState containing results Map might be empty which indicates + * no matches found but will not be null + * @throws IOException Thrown for any exceptions occurring while searching. + * @throws IllegalStateException if the dictionary has not yet been + * initialized + */ + SearchState<T> search(InputStream haystack, boolean findAll) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/SearchTerm.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/SearchTerm.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/SearchTerm.java new file mode 100644 index 0000000..62de964 --- /dev/null +++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/SearchTerm.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.search; + +import java.nio.charset.Charset; +import java.util.Arrays; + +/** + * This is an immutable thread safe object representing a search term + * + * @author + * @param <T> + */ +public class SearchTerm<T> { + + private final byte[] bytes; + private final int hashCode; + private final T reference; + + /** + * Constructs a SearchTerm. Defensively copies the given byte array + * + * @param bytes + * @throws IllegalArgument exception if given bytes are null or 0 length + */ + public SearchTerm(final byte[] bytes) { + this(bytes, true, null); + } + + /** + * Constructs a search term. Optionally performs a defensive copy of the + * given byte array. If the caller indicates a defensive copy is not + * necessary then they must not change the given arrays state any longer + * + * @param bytes + * @param defensiveCopy + * @param reference + */ + public SearchTerm(final byte[] bytes, final boolean defensiveCopy, final T reference) { + if (bytes == null || bytes.length == 0) { + throw new IllegalArgumentException(); + } + if (defensiveCopy) { + this.bytes = Arrays.copyOf(bytes, bytes.length); + } else { + this.bytes = bytes; + } + this.hashCode = Arrays.hashCode(this.bytes); + this.reference = reference; + } + + public int get(final int index) { + return bytes[index] & 0xff; + } + + /** + * @return size in of search term in bytes + */ + public int size() { + return bytes.length; + } + + /** + * @return reference object for this given search term + */ + public T getReference() { + return reference; + } + + /** + * Determines if the given window starts with the same bytes as this term + * + * @param window Current window of bytes from the haystack being evaluated. + * @param windowLength The length of the window to consider + * @return true if this term starts with the same bytes of the given window + */ + public boolean startsWith(byte[] window, int windowLength) { + if (windowLength > window.length) { + throw new IndexOutOfBoundsException(); + } + if (bytes.length < windowLength) { + return false; + } + for (int i = 0; i < bytes.length && i < windowLength; i++) { + if (bytes[i] != window[i]) { + return false; + } + } + return true; + } + + /** + * @return a defensive copy of the internal byte structure + */ + public byte[] getBytes() { + return Arrays.copyOf(bytes, bytes.length); + } + + @Override + public int hashCode() { + return hashCode; + } + + @Override + public boolean equals(final Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final SearchTerm other = (SearchTerm) obj; + if (this.hashCode != other.hashCode) { + return false; + } + return Arrays.equals(this.bytes, other.bytes); + } + + @Override + public String toString() { + return new String(bytes); + } + + public String toString(final Charset charset) { + return new String(bytes, charset); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/AhoCorasick.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/AhoCorasick.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/AhoCorasick.java new file mode 100644 index 0000000..3b8afaf --- /dev/null +++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/AhoCorasick.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.search.ahocorasick; + +import java.io.IOException; +import java.io.InputStream; +import java.util.LinkedList; +import java.util.Queue; +import java.util.Set; + +import org.apache.nifi.util.search.Search; +import org.apache.nifi.util.search.SearchTerm; + +public class AhoCorasick<T> implements Search<T> { + + private Node root = null; + + /** + * Constructs a new search object. + * + * @throws IllegalArgumentException if given terms are null or empty + */ + public AhoCorasick() { + } + + @Override + public void initializeDictionary(final Set<SearchTerm<T>> terms) { + if (root != null) { + throw new IllegalStateException(); + } + root = new Node(); + if (terms == null || terms.isEmpty()) { + throw new IllegalArgumentException(); + } + for (final SearchTerm<T> term : terms) { + int i = 0; + Node nextNode = root; + while (true) { + nextNode = addMatch(term, i, nextNode); + if (nextNode == null) { + break; //we're done + } + i++; + } + } + initialize(); + } + + private Node addMatch(final SearchTerm<T> term, final int offset, final Node current) { + final int index = term.get(offset); + boolean atEnd = (offset == (term.size() - 1)); + if (current.getNeighbor(index) == null) { + if (atEnd) { + current.setNeighbor(new Node(term), index); + return null; + } + current.setNeighbor(new Node(), index); + } else if (atEnd) { + current.getNeighbor(index).setMatchingTerm(term); + return null; + } + return current.getNeighbor(index); + } + + private void initialize() { + //perform bgs to build failure links + final Queue<Node> queue = new LinkedList<>(); + queue.add(root); + root.setFailureNode(null); + while (!queue.isEmpty()) { + final Node current = queue.poll(); + for (int i = 0; i < 256; i++) { + final Node next = current.getNeighbor(i); + if (next != null) { + //traverse failure to get state + Node fail = current.getFailureNode(); + while ((fail != null) && fail.getNeighbor(i) == null) { + fail = fail.getFailureNode(); + } + if (fail != null) { + next.setFailureNode(fail.getNeighbor(i)); + } else { + next.setFailureNode(root); + } + queue.add(next); + } + } + } + } + + @Override + public SearchState search(final InputStream stream, final boolean findAll) throws IOException { + return search(stream, findAll, null); + } + + private SearchState search(final InputStream stream, final boolean findAll, final SearchState state) throws IOException { + if (root == null) { + throw new IllegalStateException(); + } + final SearchState<T> currentState = (state == null) ? new SearchState(root) : state; + if (!findAll && currentState.foundMatch()) { + throw new IllegalStateException("A match has already been found yet we're being asked to keep searching"); + } + Node current = currentState.getCurrentNode(); + int currentChar; + while ((currentChar = stream.read()) >= 0) { + currentState.incrementBytesRead(1L); + Node next = current.getNeighbor(currentChar); + if (next == null) { + next = current.getFailureNode(); + while ((next != null) && next.getNeighbor(currentChar) == null) { + next = next.getFailureNode(); + } + if (next != null) { + next = next.getNeighbor(currentChar); + } else { + next = root; + } + } + if (next == null) { + throw new IllegalStateException("tree out of sync"); + } + //Accept condition + if (next.hasMatch()) { + currentState.addResult(next.getMatchingTerm()); + } + for (Node failNode = next.getFailureNode(); failNode != null; failNode = failNode.getFailureNode()) { + if (failNode.hasMatch()) { + currentState.addResult(failNode.getMatchingTerm()); + } + } + current = next; + if (currentState.foundMatch() && !findAll) { + break;//give up as soon as we have at least one match + } + } + currentState.setCurrentNode(current); + return currentState; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/Node.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/Node.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/Node.java new file mode 100644 index 0000000..0ac325c --- /dev/null +++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/Node.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.search.ahocorasick; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.util.search.SearchTerm; + +/** + * + * @author + */ +public class Node { + + private final Map<Integer, Node> neighborMap; + private Node failureNode; + private SearchTerm<?> term; + + Node(final SearchTerm<?> term) { + this(); + this.term = term; + } + + Node() { + neighborMap = new HashMap<>(); + term = null; + } + + void setFailureNode(final Node fail) { + failureNode = fail; + } + + public Node getFailureNode() { + return failureNode; + } + + public boolean hasMatch() { + return term != null; + } + + void setMatchingTerm(final SearchTerm<?> term) { + this.term = term; + } + + public SearchTerm<?> getMatchingTerm() { + return term; + } + + public Node getNeighbor(final int index) { + return neighborMap.get(index); + } + + void setNeighbor(final Node neighbor, final int index) { + neighborMap.put(index, neighbor); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java new file mode 100644 index 0000000..6d36ad0 --- /dev/null +++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.search.ahocorasick; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.util.search.SearchTerm; + +public class SearchState<T> { + + private Node currentNode; + private final Map<SearchTerm<T>, List<Long>> resultMap; + private long bytesRead; + + SearchState(final Node rootNode) { + resultMap = new HashMap<>(5); + currentNode = rootNode; + bytesRead = 0L; + } + + void incrementBytesRead(final long increment) { + bytesRead += increment; + } + + void setCurrentNode(final Node curr) { + currentNode = curr; + } + + public Node getCurrentNode() { + return currentNode; + } + + public Map<SearchTerm<T>, List<Long>> getResults() { + return new HashMap<>(resultMap); + } + + void addResult(final SearchTerm matchingTerm) { + final List<Long> indexes = (resultMap.containsKey(matchingTerm)) ? resultMap.get(matchingTerm) : new ArrayList<Long>(5); + indexes.add(bytesRead); + resultMap.put(matchingTerm, indexes); + } + + public boolean foundMatch() { + return !resultMap.isEmpty(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java b/commons/nifi-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java new file mode 100644 index 0000000..bd30a96 --- /dev/null +++ b/commons/nifi-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.io; + +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; + +import org.apache.nifi.remote.io.CompressionInputStream; +import org.apache.nifi.remote.io.CompressionOutputStream; + +import org.junit.Test; + +public class TestCompressionInputOutputStreams { + + @Test + public void testSimple() throws IOException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + final byte[] data = "Hello, World!".getBytes("UTF-8"); + + final CompressionOutputStream cos = new CompressionOutputStream(baos); + cos.write(data); + cos.flush(); + cos.close(); + + final byte[] compressedBytes = baos.toByteArray(); + final CompressionInputStream cis = new CompressionInputStream(new ByteArrayInputStream(compressedBytes)); + final byte[] decompressed = readFully(cis); + + assertTrue(Arrays.equals(data, decompressed)); + } + + @Test + public void testDataLargerThanBuffer() throws IOException { + final String str = "The quick brown fox jumps over the lazy dog\r\n\n\n\r"; + + final StringBuilder sb = new StringBuilder(); + for (int i = 0; i < 100; i++) { + sb.append(str); + } + final byte[] data = sb.toString().getBytes("UTF-8"); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + final CompressionOutputStream cos = new CompressionOutputStream(baos, 8192); + cos.write(data); + cos.flush(); + cos.close(); + + final byte[] compressedBytes = baos.toByteArray(); + final CompressionInputStream cis = new CompressionInputStream(new ByteArrayInputStream(compressedBytes)); + final byte[] decompressed = readFully(cis); + + assertTrue(Arrays.equals(data, decompressed)); + } + + @Test + public void testDataLargerThanBufferWhileFlushing() throws IOException { + final String str = "The quick brown fox jumps over the lazy dog\r\n\n\n\r"; + final byte[] data = str.getBytes("UTF-8"); + + final StringBuilder sb = new StringBuilder(); + final byte[] data1024; + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + final CompressionOutputStream cos = new CompressionOutputStream(baos, 8192); + for (int i = 0; i < 1024; i++) { + cos.write(data); + cos.flush(); + sb.append(str); + } + cos.close(); + data1024 = sb.toString().getBytes("UTF-8"); + + final byte[] compressedBytes = baos.toByteArray(); + final CompressionInputStream cis = new CompressionInputStream(new ByteArrayInputStream(compressedBytes)); + final byte[] decompressed = readFully(cis); + + assertTrue(Arrays.equals(data1024, decompressed)); + } + + @Test + public void testSendingMultipleFilesBackToBackOnSameStream() throws IOException { + final String str = "The quick brown fox jumps over the lazy dog\r\n\n\n\r"; + final byte[] data = str.getBytes("UTF-8"); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + final CompressionOutputStream cos = new CompressionOutputStream(baos, 8192); + for (int i = 0; i < 512; i++) { + cos.write(data); + cos.flush(); + } + cos.close(); + + final CompressionOutputStream cos2 = new CompressionOutputStream(baos, 8192); + for (int i = 0; i < 512; i++) { + cos2.write(data); + cos2.flush(); + } + cos2.close(); + + final byte[] data512; + final StringBuilder sb = new StringBuilder(); + for (int i = 0; i < 512; i++) { + sb.append(str); + } + data512 = sb.toString().getBytes("UTF-8"); + + final byte[] compressedBytes = baos.toByteArray(); + final ByteArrayInputStream bais = new ByteArrayInputStream(compressedBytes); + + final CompressionInputStream cis = new CompressionInputStream(bais); + final byte[] decompressed = readFully(cis); + assertTrue(Arrays.equals(data512, decompressed)); + + final CompressionInputStream cis2 = new CompressionInputStream(bais); + final byte[] decompressed2 = readFully(cis2); + assertTrue(Arrays.equals(data512, decompressed2)); + } + + private byte[] readFully(final InputStream in) throws IOException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + final byte[] buffer = new byte[65536]; + int len; + while ((len = in.read(buffer)) >= 0) { + baos.write(buffer, 0, len); + } + + return baos.toByteArray(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java b/commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java new file mode 100644 index 0000000..52bd8de --- /dev/null +++ b/commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.stream.io; + +import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.apache.nifi.stream.io.ByteArrayOutputStream; +import org.apache.nifi.stream.io.LeakyBucketStreamThrottler; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; + +import org.junit.Ignore; +import org.junit.Test; + +@Ignore("Tests are time-based") +public class TestLeakyBucketThrottler { + + @Test(timeout = 10000) + public void testOutputStreamInterface() throws IOException { + // throttle rate at 1 MB/sec + final LeakyBucketStreamThrottler throttler = new LeakyBucketStreamThrottler(1024 * 1024); + + final byte[] data = new byte[1024 * 1024 * 4]; + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final OutputStream throttledOut = throttler.newThrottledOutputStream(baos); + + final long start = System.currentTimeMillis(); + throttledOut.write(data); + throttler.close(); + final long millis = System.currentTimeMillis() - start; + // should take 4 sec give or take + assertTrue(millis > 3000); + assertTrue(millis < 6000); + } + + @Test(timeout = 10000) + public void testInputStreamInterface() throws IOException { + // throttle rate at 1 MB/sec + final LeakyBucketStreamThrottler throttler = new LeakyBucketStreamThrottler(1024 * 1024); + + final byte[] data = new byte[1024 * 1024 * 4]; + final ByteArrayInputStream bais = new ByteArrayInputStream(data); + final InputStream throttledIn = throttler.newThrottledInputStream(bais); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + final byte[] buffer = new byte[4096]; + final long start = System.currentTimeMillis(); + int len; + while ((len = throttledIn.read(buffer)) > 0) { + baos.write(buffer, 0, len); + } + throttler.close(); + final long millis = System.currentTimeMillis() - start; + // should take 4 sec give or take + assertTrue(millis > 3000); + assertTrue(millis < 6000); + baos.close(); + } + + @Test(timeout = 10000) + public void testDirectInterface() throws IOException, InterruptedException { + // throttle rate at 1 MB/sec + final LeakyBucketStreamThrottler throttler = new LeakyBucketStreamThrottler(1024 * 1024); + + // create 3 threads, each sending ~2 MB + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final List<Thread> threads = new ArrayList<Thread>(); + for (int i = 0; i < 3; i++) { + final Thread t = new WriterThread(i, throttler, baos); + threads.add(t); + } + + final long start = System.currentTimeMillis(); + for (final Thread t : threads) { + t.start(); + } + + for (final Thread t : threads) { + t.join(); + } + final long elapsed = System.currentTimeMillis() - start; + + throttler.close(); + + // To send 15 MB, it should have taken at least 5 seconds and no more than 7 seconds, to + // allow for busy-ness and the fact that we could write a tiny bit more than the limit. + assertTrue(elapsed > 5000); + assertTrue(elapsed < 7000); + + // ensure bytes were copied out appropriately + assertEquals(3 * (2 * 1024 * 1024 + 1), baos.getBufferLength()); + assertEquals((byte) 'A', baos.getUnderlyingBuffer()[baos.getBufferLength() - 1]); + } + + private static class WriterThread extends Thread { + + private final int idx; + private final byte[] data = new byte[1024 * 1024 * 2 + 1]; + private final LeakyBucketStreamThrottler throttler; + private final OutputStream out; + + public WriterThread(final int idx, final LeakyBucketStreamThrottler throttler, final OutputStream out) { + this.idx = idx; + this.throttler = throttler; + this.out = out; + this.data[this.data.length - 1] = (byte) 'A'; + } + + @Override + public void run() { + long startMillis = System.currentTimeMillis(); + long bytesWritten = 0L; + try { + throttler.copy(new ByteArrayInputStream(data), out); + } catch (IOException e) { + e.printStackTrace(); + return; + } + long now = System.currentTimeMillis(); + long millisElapsed = now - startMillis; + bytesWritten += data.length; + float bytesPerSec = (float) bytesWritten / (float) millisElapsed * 1000F; + System.out.println(idx + " : copied data at a rate of " + bytesPerSec + " bytes/sec"); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/test/java/org/apache/nifi/util/TestNaiveSearchRingBuffer.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/test/java/org/apache/nifi/util/TestNaiveSearchRingBuffer.java b/commons/nifi-utils/src/test/java/org/apache/nifi/util/TestNaiveSearchRingBuffer.java new file mode 100644 index 0000000..0838e96 --- /dev/null +++ b/commons/nifi-utils/src/test/java/org/apache/nifi/util/TestNaiveSearchRingBuffer.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; + +public class TestNaiveSearchRingBuffer { + + @Test + public void testAddAndCompare() { + final byte[] pattern = new byte[]{ + '\r', '0', 38, 48 + }; + + final byte[] search = new byte[]{ + '\r', '0', 38, 58, 58, 83, 78, '\r', '0', 38, 48, 83, 92, 78, 4, 38 + }; + + final NaiveSearchRingBuffer circ = new NaiveSearchRingBuffer(pattern); + int counter = -1; + for (final byte b : search) { + counter++; + final boolean matched = circ.addAndCompare(b); + if (counter == 10) { + assertTrue(matched); + } else { + assertFalse(matched); + } + } + } + + @Test + public void testGetOldestByte() { + final byte[] pattern = new byte[]{ + '\r', '0', 38, 48 + }; + + final byte[] search = new byte[]{ + '\r', '0', 38, 58, 58, 83, 78, (byte) 223, (byte) 227, (byte) 250, '\r', '0', 38, 48, 83, 92, 78, 4, 38 + }; + + final NaiveSearchRingBuffer circ = new NaiveSearchRingBuffer(pattern); + int counter = -1; + for (final byte b : search) { + counter++; + final boolean matched = circ.addAndCompare(b); + if (counter == 13) { + assertTrue(matched); + } else { + assertFalse(matched); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/test/java/org/apache/nifi/util/file/monitor/TestCompoundUpdateMonitor.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/test/java/org/apache/nifi/util/file/monitor/TestCompoundUpdateMonitor.java b/commons/nifi-utils/src/test/java/org/apache/nifi/util/file/monitor/TestCompoundUpdateMonitor.java new file mode 100644 index 0000000..f576e94 --- /dev/null +++ b/commons/nifi-utils/src/test/java/org/apache/nifi/util/file/monitor/TestCompoundUpdateMonitor.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.file.monitor; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Path; +import java.util.UUID; + + +import org.junit.Test; + +public class TestCompoundUpdateMonitor { + + @Test + public void test() throws IOException { + final UpdateMonitor lastModified = new LastModifiedMonitor(); + final MD5SumMonitor md5 = new MD5SumMonitor(); + final CompoundUpdateMonitor compound = new CompoundUpdateMonitor(lastModified, md5); + + final File file = new File("target/" + UUID.randomUUID().toString()); + if (file.exists()) { + assertTrue(file.delete()); + } + assertTrue(file.createNewFile()); + + final Path path = file.toPath(); + + final Object curState = compound.getCurrentState(path); + final Object state2 = compound.getCurrentState(path); + + assertEquals(curState, state2); + file.setLastModified(System.currentTimeMillis() + 1000L); + final Object state3 = compound.getCurrentState(path); + assertEquals(state2, state3); + + final Object state4 = compound.getCurrentState(path); + assertEquals(state3, state4); + + final long lastModifiedDate = file.lastModified(); + try (final OutputStream out = new FileOutputStream(file)) { + out.write("Hello".getBytes("UTF-8")); + } + + file.setLastModified(lastModifiedDate); + + final Object state5 = compound.getCurrentState(path); + assertNotSame(state4, state5); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/test/java/org/apache/nifi/util/file/monitor/TestSynchronousFileWatcher.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/test/java/org/apache/nifi/util/file/monitor/TestSynchronousFileWatcher.java b/commons/nifi-utils/src/test/java/org/apache/nifi/util/file/monitor/TestSynchronousFileWatcher.java new file mode 100644 index 0000000..7125581 --- /dev/null +++ b/commons/nifi-utils/src/test/java/org/apache/nifi/util/file/monitor/TestSynchronousFileWatcher.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.file.monitor; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; + +import org.junit.Test; + + +public class TestSynchronousFileWatcher { + + @Test + public void testIt() throws UnsupportedEncodingException, IOException, InterruptedException { + final Path path = Paths.get("target/1.txt"); + Files.copy(new ByteArrayInputStream("Hello, World!".getBytes("UTF-8")), path, StandardCopyOption.REPLACE_EXISTING); + final UpdateMonitor monitor = new MD5SumMonitor(); + + final SynchronousFileWatcher watcher = new SynchronousFileWatcher(path, monitor, 10L); + assertFalse(watcher.checkAndReset()); + Thread.sleep(30L); + assertFalse(watcher.checkAndReset()); + + final FileOutputStream fos = new FileOutputStream(path.toFile()); + try { + fos.write("Good-bye, World!".getBytes("UTF-8")); + fos.getFD().sync(); + } finally { + fos.close(); + } + + assertTrue(watcher.checkAndReset()); + assertFalse(watcher.checkAndReset()); + + Thread.sleep(30L); + assertFalse(watcher.checkAndReset()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestCompoundUpdateMonitor.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestCompoundUpdateMonitor.java b/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestCompoundUpdateMonitor.java deleted file mode 100644 index c796a96..0000000 --- a/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestCompoundUpdateMonitor.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util.timebuffer; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.file.Path; -import java.util.UUID; - -import org.apache.nifi.io.CompoundUpdateMonitor; -import org.apache.nifi.io.LastModifiedMonitor; -import org.apache.nifi.io.MD5SumMonitor; -import org.apache.nifi.io.UpdateMonitor; - -import org.junit.Test; - -public class TestCompoundUpdateMonitor { - - @Test - public void test() throws IOException { - final UpdateMonitor lastModified = new LastModifiedMonitor(); - final MD5SumMonitor md5 = new MD5SumMonitor(); - final CompoundUpdateMonitor compound = new CompoundUpdateMonitor(lastModified, md5); - - final File file = new File("target/" + UUID.randomUUID().toString()); - if (file.exists()) { - assertTrue(file.delete()); - } - assertTrue(file.createNewFile()); - - final Path path = file.toPath(); - - final Object curState = compound.getCurrentState(path); - final Object state2 = compound.getCurrentState(path); - - assertEquals(curState, state2); - file.setLastModified(System.currentTimeMillis() + 1000L); - final Object state3 = compound.getCurrentState(path); - assertEquals(state2, state3); - - final Object state4 = compound.getCurrentState(path); - assertEquals(state3, state4); - - final long lastModifiedDate = file.lastModified(); - try (final OutputStream out = new FileOutputStream(file)) { - out.write("Hello".getBytes("UTF-8")); - } - - file.setLastModified(lastModifiedDate); - - final Object state5 = compound.getCurrentState(path); - assertNotSame(state4, state5); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestSynchronousFileWatcher.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestSynchronousFileWatcher.java b/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestSynchronousFileWatcher.java deleted file mode 100644 index 4b2c0d5..0000000 --- a/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestSynchronousFileWatcher.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util.timebuffer; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.io.ByteArrayInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.StandardCopyOption; - -import org.junit.Test; - -import org.apache.nifi.io.MD5SumMonitor; -import org.apache.nifi.io.SynchronousFileWatcher; -import org.apache.nifi.io.UpdateMonitor; - -public class TestSynchronousFileWatcher { - - @Test - public void testIt() throws UnsupportedEncodingException, IOException, InterruptedException { - final Path path = Paths.get("target/1.txt"); - Files.copy(new ByteArrayInputStream("Hello, World!".getBytes("UTF-8")), path, StandardCopyOption.REPLACE_EXISTING); - final UpdateMonitor monitor = new MD5SumMonitor(); - - final SynchronousFileWatcher watcher = new SynchronousFileWatcher(path, monitor, 10L); - assertFalse(watcher.checkAndReset()); - Thread.sleep(30L); - assertFalse(watcher.checkAndReset()); - - final FileOutputStream fos = new FileOutputStream(path.toFile()); - try { - fos.write("Good-bye, World!".getBytes("UTF-8")); - fos.getFD().sync(); - } finally { - fos.close(); - } - - assertTrue(watcher.checkAndReset()); - assertFalse(watcher.checkAndReset()); - - Thread.sleep(30L); - assertFalse(watcher.checkAndReset()); - } -}