http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java ---------------------------------------------------------------------- diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java deleted file mode 100644 index 41a0557..0000000 --- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java +++ /dev/null @@ -1,623 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util.file; - -import java.io.BufferedInputStream; -import java.io.Closeable; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.FilenameFilter; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.channels.FileChannel; -import java.nio.channels.FileLock; -import java.nio.file.Files; -import java.nio.file.Path; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Random; - -import org.slf4j.Logger; - -/** - * A utility class containing a few useful static methods to do typical IO - * operations. - * - * @author unattributed - */ -public class FileUtils { - - public static final long TRANSFER_CHUNK_SIZE_BYTES = 1024 * 1024 * 8; //8 MB chunks - public static final long MILLIS_BETWEEN_ATTEMPTS = 50L; - - /** - * Closes the given closeable quietly - no logging, no exceptions... - * - * @param closeable - */ - public static void closeQuietly(final Closeable closeable) { - if (null != closeable) { - try { - closeable.close(); - } catch (final IOException io) {/*IGNORE*/ - - } - } - } - - /** - * Releases the given lock quietly - no logging, no exception - * - * @param lock - */ - public static void releaseQuietly(final FileLock lock) { - if (null != lock) { - try { - lock.release(); - } catch (final IOException io) { - /*IGNORE*/ - } - } - } - - public static void ensureDirectoryExistAndCanAccess(final File dir) throws IOException { - if (dir.exists() && !dir.isDirectory()) { - throw new IOException(dir.getAbsolutePath() + " is not a directory"); - } else if (!dir.exists()) { - final boolean made = dir.mkdirs(); - if (!made) { - throw new IOException(dir.getAbsolutePath() + " could not be created"); - } - } - if (!(dir.canRead() && dir.canWrite())) { - throw new IOException(dir.getAbsolutePath() + " directory does not have read/write privilege"); - } - } - - /** - * Deletes the given file. If the given file exists but could not be deleted - * this will be printed as a warning to the given logger - * - * @param file - * @param logger - * @return - */ - public static boolean deleteFile(final File file, final Logger logger) { - return FileUtils.deleteFile(file, logger, 1); - } - - /** - * Deletes the given file. If the given file exists but could not be deleted - * this will be printed as a warning to the given logger - * - * @param file - * @param logger - * @param attempts indicates how many times an attempt to delete should be - * made - * @return true if given file no longer exists - */ - public static boolean deleteFile(final File file, final Logger logger, final int attempts) { - if (file == null) { - return false; - } - boolean isGone = false; - try { - if (file.exists()) { - final int effectiveAttempts = Math.max(1, attempts); - for (int i = 0; i < effectiveAttempts && !isGone; i++) { - isGone = file.delete() || !file.exists(); - if (!isGone && (effectiveAttempts - i) > 1) { - FileUtils.sleepQuietly(MILLIS_BETWEEN_ATTEMPTS); - } - } - if (!isGone && logger != null) { - logger.warn("File appears to exist but unable to delete file: " + file.getAbsolutePath()); - } - } - } catch (final Throwable t) { - if (logger != null) { - logger.warn("Unable to delete file: '" + file.getAbsolutePath() + "' due to " + t); - } - } - return isGone; - } - - /** - * Deletes all of the given files. If any exist and cannot be deleted that - * will be printed at warn to the given logger. - * - * @param files can be null - * @param logger can be null - */ - public static void deleteFile(final List<File> files, final Logger logger) { - FileUtils.deleteFile(files, logger, 1); - } - - /** - * Deletes all of the given files. If any exist and cannot be deleted that - * will be printed at warn to the given logger. - * - * @param files can be null - * @param logger can be null - * @param attempts indicates how many times an attempt should be made to - * delete each file - */ - public static void deleteFile(final List<File> files, final Logger logger, final int attempts) { - if (null == files || files.isEmpty()) { - return; - } - final int effectiveAttempts = Math.max(1, attempts); - for (final File file : files) { - try { - boolean isGone = false; - for (int i = 0; i < effectiveAttempts && !isGone; i++) { - isGone = file.delete() || !file.exists(); - if (!isGone && (effectiveAttempts - i) > 1) { - FileUtils.sleepQuietly(MILLIS_BETWEEN_ATTEMPTS); - } - } - if (!isGone && logger != null) { - logger.warn("File appears to exist but unable to delete file: " + file.getAbsolutePath()); - } - } catch (final Throwable t) { - if (null != logger) { - logger.warn("Unable to delete file given from path: '" + file.getPath() + "' due to " + t); - } - } - } - } - - /** - * Deletes all files (not directories..) in the given directory (non - * recursive) that match the given filename filter. If any file cannot be - * deleted then this is printed at warn to the given logger. - * - * @param directory - * @param filter if null then no filter is used - * @param logger - */ - public static void deleteFilesInDir(final File directory, final FilenameFilter filter, final Logger logger) { - FileUtils.deleteFilesInDir(directory, filter, logger, false); - } - - /** - * Deletes all files (not directories) in the given directory (recursive) - * that match the given filename filter. If any file cannot be deleted then - * this is printed at warn to the given logger. - * - * @param directory - * @param filter if null then no filter is used - * @param logger - * @param recurse - */ - public static void deleteFilesInDir(final File directory, final FilenameFilter filter, final Logger logger, final boolean recurse) { - FileUtils.deleteFilesInDir(directory, filter, logger, recurse, false); - } - - /** - * Deletes all files (not directories) in the given directory (recursive) - * that match the given filename filter. If any file cannot be deleted then - * this is printed at warn to the given logger. - * - * @param directory - * @param filter if null then no filter is used - * @param logger - * @param recurse - * @param deleteEmptyDirectories default is false; if true will delete - * directories found that are empty - */ - public static void deleteFilesInDir(final File directory, final FilenameFilter filter, final Logger logger, final boolean recurse, final boolean deleteEmptyDirectories) { - // ensure the specified directory is actually a directory and that it exists - if (null != directory && directory.isDirectory()) { - final File ingestFiles[] = directory.listFiles(); - for (File ingestFile : ingestFiles) { - boolean process = (filter == null) ? true : filter.accept(directory, ingestFile.getName()); - if (ingestFile.isFile() && process) { - FileUtils.deleteFile(ingestFile, logger, 3); - } - if (ingestFile.isDirectory() && recurse) { - FileUtils.deleteFilesInDir(ingestFile, filter, logger, recurse, deleteEmptyDirectories); - if (deleteEmptyDirectories && ingestFile.list().length == 0) { - FileUtils.deleteFile(ingestFile, logger, 3); - } - } - } - } - } - - /** - * Deletes given files. - * - * @param files - * @param recurse will recurse - * @throws IOException - */ - public static void deleteFiles(final Collection<File> files, final boolean recurse) throws IOException { - for (final File file : files) { - FileUtils.deleteFile(file, recurse); - } - } - - public static void deleteFile(final File file, final boolean recurse) throws IOException { - if (file.isDirectory() && recurse) { - FileUtils.deleteFiles(Arrays.asList(file.listFiles()), recurse); - } - //now delete the file itself regardless of whether it is plain file or a directory - if (!FileUtils.deleteFile(file, null, 5)) { - throw new IOException("Unable to delete " + file.getAbsolutePath()); - } - } - - /** - * Randomly generates a sequence of bytes and overwrites the contents of the - * file a number of times. The file is then deleted. - * - * @param file File to be overwritten a number of times and, ultimately, - * deleted - * @param passes Number of times file should be overwritten - * @throws IOException if something makes shredding or deleting a problem - */ - public static void shredFile(final File file, final int passes) - throws IOException { - final Random generator = new Random(); - final long fileLength = file.length(); - final int byteArraySize = (int) Math.min(fileLength, 1048576); // 1MB - final byte[] b = new byte[byteArraySize]; - final long numOfRandomWrites = (fileLength / b.length) + 1; - final FileOutputStream fos = new FileOutputStream(file); - try { - // Over write file contents (passes) times - final FileChannel channel = fos.getChannel(); - for (int i = 0; i < passes; i++) { - generator.nextBytes(b); - for (int j = 0; j <= numOfRandomWrites; j++) { - fos.write(b); - } - fos.flush(); - channel.position(0); - } - // Write out "0" for each byte in the file - Arrays.fill(b, (byte) 0); - for (int j = 0; j < numOfRandomWrites; j++) { - fos.write(b); - } - fos.flush(); - fos.close(); - // Try to delete the file a few times - if (!FileUtils.deleteFile(file, null, 5)) { - throw new IOException("Failed to delete file after shredding"); - } - - } finally { - FileUtils.closeQuietly(fos); - } - } - - public static long copy(final InputStream in, final OutputStream out) throws IOException { - final byte[] buffer = new byte[65536]; - long copied = 0L; - int len; - while ((len = in.read(buffer)) > 0) { - out.write(buffer, 0, len); - copied += len; - } - - return copied; - } - - public static long copyBytes(final byte[] bytes, final File destination, final boolean lockOutputFile) throws FileNotFoundException, IOException { - FileOutputStream fos = null; - FileLock outLock = null; - long fileSize = 0L; - try { - fos = new FileOutputStream(destination); - final FileChannel out = fos.getChannel(); - if (lockOutputFile) { - outLock = out.tryLock(0, Long.MAX_VALUE, false); - if (null == outLock) { - throw new IOException("Unable to obtain exclusive file lock for: " + destination.getAbsolutePath()); - } - } - fos.write(bytes); - fos.flush(); - fileSize = bytes.length; - } finally { - FileUtils.releaseQuietly(outLock); - FileUtils.closeQuietly(fos); - } - return fileSize; - } - - /** - * Copies the given source file to the given destination file. The given - * destination will be overwritten if it already exists. - * - * @param source - * @param destination - * @param lockInputFile if true will lock input file during copy; if false - * will not - * @param lockOutputFile if true will lock output file during copy; if false - * will not - * @param move if true will perform what is effectively a move operation - * rather than a pure copy. This allows for potentially highly efficient - * movement of the file but if not possible this will revert to a copy then - * delete behavior. If false, then the file is copied and the source file is - * retained. If a true rename/move occurs then no lock is held during that - * time. - * @param logger if failures occur, they will be logged to this logger if - * possible. If this logger is null, an IOException will instead be thrown, - * indicating the problem. - * @return long number of bytes copied - * @throws FileNotFoundException if the source file could not be found - * @throws IOException - * @throws SecurityException if a security manager denies the needed file - * operations - */ - public static long copyFile(final File source, final File destination, final boolean lockInputFile, final boolean lockOutputFile, final boolean move, final Logger logger) throws FileNotFoundException, IOException { - - FileInputStream fis = null; - FileOutputStream fos = null; - FileLock inLock = null; - FileLock outLock = null; - long fileSize = 0L; - if (!source.canRead()) { - throw new IOException("Must at least have read permission"); - - } - if (move && source.renameTo(destination)) { - fileSize = destination.length(); - } else { - try { - fis = new FileInputStream(source); - fos = new FileOutputStream(destination); - final FileChannel in = fis.getChannel(); - final FileChannel out = fos.getChannel(); - if (lockInputFile) { - inLock = in.tryLock(0, Long.MAX_VALUE, true); - if (null == inLock) { - throw new IOException("Unable to obtain shared file lock for: " + source.getAbsolutePath()); - } - } - if (lockOutputFile) { - outLock = out.tryLock(0, Long.MAX_VALUE, false); - if (null == outLock) { - throw new IOException("Unable to obtain exclusive file lock for: " + destination.getAbsolutePath()); - } - } - long bytesWritten = 0; - do { - bytesWritten += out.transferFrom(in, bytesWritten, TRANSFER_CHUNK_SIZE_BYTES); - fileSize = in.size(); - } while (bytesWritten < fileSize); - out.force(false); - FileUtils.closeQuietly(fos); - FileUtils.closeQuietly(fis); - fos = null; - fis = null; - if (move && !FileUtils.deleteFile(source, null, 5)) { - if (logger == null) { - FileUtils.deleteFile(destination, null, 5); - throw new IOException("Could not remove file " + source.getAbsolutePath()); - } else { - logger.warn("Configured to delete source file when renaming/move not successful. However, unable to delete file at: " + source.getAbsolutePath()); - } - } - } finally { - FileUtils.releaseQuietly(inLock); - FileUtils.releaseQuietly(outLock); - FileUtils.closeQuietly(fos); - FileUtils.closeQuietly(fis); - } - } - return fileSize; - } - - /** - * Copies the given source file to the given destination file. The given - * destination will be overwritten if it already exists. - * - * @param source - * @param destination - * @param lockInputFile if true will lock input file during copy; if false - * will not - * @param lockOutputFile if true will lock output file during copy; if false - * will not - * @param logger - * @return long number of bytes copied - * @throws FileNotFoundException if the source file could not be found - * @throws IOException - * @throws SecurityException if a security manager denies the needed file - * operations - */ - public static long copyFile(final File source, final File destination, final boolean lockInputFile, final boolean lockOutputFile, final Logger logger) throws FileNotFoundException, IOException { - return FileUtils.copyFile(source, destination, lockInputFile, lockOutputFile, false, logger); - } - - public static long copyFile(final File source, final OutputStream stream, final boolean closeOutputStream, final boolean lockInputFile) throws FileNotFoundException, IOException { - FileInputStream fis = null; - FileLock inLock = null; - long fileSize = 0L; - try { - fis = new FileInputStream(source); - final FileChannel in = fis.getChannel(); - if (lockInputFile) { - inLock = in.tryLock(0, Long.MAX_VALUE, true); - if (inLock == null) { - throw new IOException("Unable to obtain exclusive file lock for: " + source.getAbsolutePath()); - } - - } - - byte[] buffer = new byte[1 << 18]; //256 KB - int bytesRead = -1; - while ((bytesRead = fis.read(buffer)) != -1) { - stream.write(buffer, 0, bytesRead); - } - in.force(false); - stream.flush(); - fileSize = in.size(); - } finally { - FileUtils.releaseQuietly(inLock); - FileUtils.closeQuietly(fis); - if (closeOutputStream) { - FileUtils.closeQuietly(stream); - } - } - return fileSize; - } - - public static long copyFile(final InputStream stream, final File destination, final boolean closeInputStream, final boolean lockOutputFile) throws FileNotFoundException, IOException { - final Path destPath = destination.toPath(); - final long size = Files.copy(stream, destPath); - if (closeInputStream) { - stream.close(); - } - return size; - } - - /** - * Renames the given file from the source path to the destination path. This - * handles multiple attempts. This should only be used to rename within a - * given directory. Renaming across directories might not work well. See the - * <code>File.renameTo</code> for more information. - * - * @param source the file to rename - * @param destination the file path to rename to - * @param maxAttempts the max number of attempts to attempt the rename - * @throws IOException if rename isn't successful - */ - public static void renameFile(final File source, final File destination, final int maxAttempts) throws IOException { - FileUtils.renameFile(source, destination, maxAttempts, false); - } - - /** - * Renames the given file from the source path to the destination path. This - * handles multiple attempts. This should only be used to rename within a - * given directory. Renaming across directories might not work well. See the - * <code>File.renameTo</code> for more information. - * - * @param source the file to rename - * @param destination the file path to rename to - * @param maxAttempts the max number of attempts to attempt the rename - * @param replace if true and a rename attempt fails will check if a file is - * already at the destination path. If so it will delete that file and - * attempt the rename according the remaining maxAttempts. If false, any - * conflicting files will be left as they were and the rename attempts will - * fail if conflicting. - * @throws IOException if rename isn't successful - */ - public static void renameFile(final File source, final File destination, final int maxAttempts, final boolean replace) throws IOException { - final int attempts = (replace || maxAttempts < 1) ? Math.max(2, maxAttempts) : maxAttempts; - boolean renamed = false; - for (int i = 0; i < attempts; i++) { - renamed = source.renameTo(destination); - if (!renamed) { - FileUtils.deleteFile(destination, null, 5); - } else { - break; //rename has succeeded - } - } - if (!renamed) { - throw new IOException("Attempted " + maxAttempts + " times but unable to rename from \'" + source.getPath() + "\' to \'" + destination.getPath() + "\'"); - - } - } - - public static void sleepQuietly(final long millis) { - try { - Thread.sleep(millis); - } catch (final InterruptedException ex) { - /* do nothing */ - } - } - - /** - * Syncs a primary copy of a file with the copy in the restore directory. If - * the restore directory does not have a file and the primary has a file, - * the the primary's file is copied to the restore directory. Else if the - * restore directory has a file, but the primary does not, then the - * restore's file is copied to the primary directory. Else if the primary - * file is different than the restore file, then an IllegalStateException is - * thrown. Otherwise, if neither file exists, then no syncing is performed. - * - * @param primaryFile the primary file - * @param restoreFile the restore file - * @param logger a logger - * @throws IOException if an I/O problem was encountered during syncing - * @throws IllegalStateException if the primary and restore copies exist but - * are different - */ - public static void syncWithRestore(final File primaryFile, final File restoreFile, final Logger logger) - throws IOException { - - if (primaryFile.exists() && !restoreFile.exists()) { - // copy primary file to restore - copyFile(primaryFile, restoreFile, false, false, logger); - } else if (restoreFile.exists() && !primaryFile.exists()) { - // copy restore file to primary - copyFile(restoreFile, primaryFile, false, false, logger); - } else if (primaryFile.exists() && restoreFile.exists() && !isSame(primaryFile, restoreFile)) { - throw new IllegalStateException(String.format("Primary file '%s' is different than restore file '%s'", - primaryFile.getAbsoluteFile(), restoreFile.getAbsolutePath())); - } - } - - /** - * Returns true if the given files are the same according to their MD5 hash. - * - * @param file1 a file - * @param file2 a file - * @return true if the files are the same; false otherwise - * @throws IOException if the MD5 hash could not be computed - */ - public static boolean isSame(final File file1, final File file2) throws IOException { - return Arrays.equals(computeMd5Digest(file1), computeMd5Digest(file2)); - } - - /** - * Returns the MD5 hash of the given file. - * - * @param file a file - * @return the MD5 hash - * @throws IOException if the MD5 hash could not be computed - */ - public static byte[] computeMd5Digest(final File file) throws IOException { - final MessageDigest digest; - try { - digest = MessageDigest.getInstance("MD5"); - } catch (final NoSuchAlgorithmException nsae) { - throw new IOException(nsae); - } - - try (final FileInputStream fis = new FileInputStream(file)) { - int len; - final byte[] buffer = new byte[8192]; - while ((len = fis.read(buffer)) > -1) { - if (len > 0) { - digest.update(buffer, 0, len); - } - } - } - return digest.digest(); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/CompoundUpdateMonitor.java ---------------------------------------------------------------------- diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/CompoundUpdateMonitor.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/CompoundUpdateMonitor.java deleted file mode 100644 index 6f9c616..0000000 --- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/CompoundUpdateMonitor.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util.file.monitor; - -import java.io.IOException; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.List; - -/** - * An {@link UpdateMonitor} that combines multiple <code>UpdateMonitor</code>s - * such that it will indicate a change in a file only if ALL sub-monitors - * indicate a change. The sub-monitors will be applied in the order given and if - * any indicates that the state has not changed, the subsequent sub-monitors may - * not be given a chance to run - */ -public class CompoundUpdateMonitor implements UpdateMonitor { - - private final List<UpdateMonitor> monitors; - - public CompoundUpdateMonitor(final UpdateMonitor first, final UpdateMonitor... others) { - monitors = new ArrayList<>(); - monitors.add(first); - for (final UpdateMonitor monitor : others) { - monitors.add(monitor); - } - } - - @Override - public Object getCurrentState(final Path path) throws IOException { - return new DeferredMonitorAction(monitors, path); - } - - private static class DeferredMonitorAction { - - private static final Object NON_COMPUTED_VALUE = new Object(); - - private final List<UpdateMonitor> monitors; - private final Path path; - - private final Object[] preCalculated; - - public DeferredMonitorAction(final List<UpdateMonitor> monitors, final Path path) { - this.monitors = monitors; - this.path = path; - preCalculated = new Object[monitors.size()]; - - for (int i = 0; i < preCalculated.length; i++) { - preCalculated[i] = NON_COMPUTED_VALUE; - } - } - - private Object getCalculatedValue(final int i) throws IOException { - if (preCalculated[i] == NON_COMPUTED_VALUE) { - preCalculated[i] = monitors.get(i).getCurrentState(path); - } - - return preCalculated[i]; - } - - @Override - public boolean equals(final Object obj) { - // must return true unless ALL DeferredMonitorAction's indicate that they are different - if (obj == null) { - return false; - } - - if (!(obj instanceof DeferredMonitorAction)) { - return false; - } - - final DeferredMonitorAction other = (DeferredMonitorAction) obj; - try { - // Go through each UpdateMonitor's value and check if the value has changed. - for (int i = 0; i < preCalculated.length; i++) { - final Object mine = getCalculatedValue(i); - final Object theirs = other.getCalculatedValue(i); - - if (mine == theirs) { - // same - return true; - } - - if (mine == null && theirs == null) { - // same - return true; - } - - if (mine.equals(theirs)) { - return true; - } - } - } catch (final IOException e) { - return false; - } - - // No DeferredMonitorAction was the same as last time. Therefore, it's not equal - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/LastModifiedMonitor.java ---------------------------------------------------------------------- diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/LastModifiedMonitor.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/LastModifiedMonitor.java deleted file mode 100644 index e6be558..0000000 --- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/LastModifiedMonitor.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util.file.monitor; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; - -public class LastModifiedMonitor implements UpdateMonitor { - - @Override - public Object getCurrentState(final Path path) throws IOException { - return Files.getLastModifiedTime(path); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/MD5SumMonitor.java ---------------------------------------------------------------------- diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/MD5SumMonitor.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/MD5SumMonitor.java deleted file mode 100644 index 8dea4bf..0000000 --- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/MD5SumMonitor.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util.file.monitor; - -import java.io.FileInputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.file.Path; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; - -public class MD5SumMonitor implements UpdateMonitor { - - @Override - public Object getCurrentState(final Path path) throws IOException { - final MessageDigest digest; - try { - digest = MessageDigest.getInstance("MD5"); - } catch (final NoSuchAlgorithmException nsae) { - throw new AssertionError(nsae); - } - - try (final FileInputStream fis = new FileInputStream(path.toFile())) { - int len; - final byte[] buffer = new byte[8192]; - while ((len = fis.read(buffer)) > -1) { - if (len > 0) { - digest.update(buffer, 0, len); - } - } - } - - // Return a ByteBuffer instead of byte[] because we want equals() to do a deep equality - return ByteBuffer.wrap(digest.digest()); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/SynchronousFileWatcher.java ---------------------------------------------------------------------- diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/SynchronousFileWatcher.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/SynchronousFileWatcher.java deleted file mode 100644 index e0089c1..0000000 --- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/SynchronousFileWatcher.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util.file.monitor; - -import java.io.IOException; -import java.nio.file.Path; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -/** - * Allows the user to configure a {@link java.nio.file.Path Path} to watch for - * modifications and periodically poll to check if the file has been modified - */ -public class SynchronousFileWatcher { - - private final Path path; - private final long checkUpdateMillis; - private final UpdateMonitor monitor; - private final AtomicReference<StateWrapper> lastState; - private final Lock resourceLock = new ReentrantLock(); - - public SynchronousFileWatcher(final Path path, final UpdateMonitor monitor) { - this(path, monitor, 0L); - } - - public SynchronousFileWatcher(final Path path, final UpdateMonitor monitor, final long checkMillis) { - if (checkMillis < 0) { - throw new IllegalArgumentException(); - } - - this.path = path; - checkUpdateMillis = checkMillis; - this.monitor = monitor; - - Object currentState; - try { - currentState = monitor.getCurrentState(path); - } catch (final IOException e) { - currentState = null; - } - - this.lastState = new AtomicReference<>(new StateWrapper(currentState)); - } - - /** - * Checks if the file has been updated according to the configured - * {@link UpdateMonitor} and resets the state - * - * @return - * @throws IOException - */ - public boolean checkAndReset() throws IOException { - if (checkUpdateMillis <= 0) { // if checkUpdateMillis <= 0, always check - return checkForUpdate(); - } else { - final StateWrapper stateWrapper = lastState.get(); - if (stateWrapper.getTimestamp() < System.currentTimeMillis() - checkUpdateMillis) { - return checkForUpdate(); - } - return false; - } - } - - private boolean checkForUpdate() throws IOException { - if (resourceLock.tryLock()) { - try { - final StateWrapper wrapper = lastState.get(); - final Object newState = monitor.getCurrentState(path); - if (newState == null && wrapper.getState() == null) { - return false; - } - if (newState == null || wrapper.getState() == null) { - lastState.set(new StateWrapper(newState)); - return true; - } - - final boolean unmodified = newState.equals(wrapper.getState()); - if (!unmodified) { - lastState.set(new StateWrapper(newState)); - } - return !unmodified; - } finally { - resourceLock.unlock(); - } - } else { - return false; - } - } - - private static class StateWrapper { - - private final Object state; - private final long timestamp; - - public StateWrapper(final Object state) { - this.state = state; - this.timestamp = System.currentTimeMillis(); - } - - public Object getState() { - return state; - } - - public long getTimestamp() { - return timestamp; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/UpdateMonitor.java ---------------------------------------------------------------------- diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/UpdateMonitor.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/UpdateMonitor.java deleted file mode 100644 index 20ed1dd..0000000 --- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/UpdateMonitor.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util.file.monitor; - -import java.io.IOException; -import java.nio.file.Path; - -public interface UpdateMonitor { - - Object getCurrentState(Path path) throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/Search.java ---------------------------------------------------------------------- diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/Search.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/Search.java deleted file mode 100644 index 59b444a..0000000 --- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/Search.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util.search; - -import java.io.IOException; -import java.io.InputStream; -import java.util.Set; - -import org.apache.nifi.util.search.ahocorasick.SearchState; - -/** - * Defines an interface to search for content given a set of search terms. Any - * implementation of search must be thread safe. - * - * @author - * @param <T> - */ -public interface Search<T> { - - /** - * Establishes the dictionary of terms which will be searched in subsequent - * search calls. This can be called only once - * - * @param terms - */ - void initializeDictionary(Set<SearchTerm<T>> terms); - - /** - * Searches the given input stream for matches between the already specified - * dictionary and the contents scanned. - * - * @param haystack - * @param findAll if true will find all matches if false will find only the - * first match - * @return SearchState containing results Map might be empty which indicates - * no matches found but will not be null - * @throws IOException Thrown for any exceptions occurring while searching. - * @throws IllegalStateException if the dictionary has not yet been - * initialized - */ - SearchState<T> search(InputStream haystack, boolean findAll) throws IOException; - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/SearchTerm.java ---------------------------------------------------------------------- diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/SearchTerm.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/SearchTerm.java deleted file mode 100644 index 62de964..0000000 --- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/SearchTerm.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util.search; - -import java.nio.charset.Charset; -import java.util.Arrays; - -/** - * This is an immutable thread safe object representing a search term - * - * @author - * @param <T> - */ -public class SearchTerm<T> { - - private final byte[] bytes; - private final int hashCode; - private final T reference; - - /** - * Constructs a SearchTerm. Defensively copies the given byte array - * - * @param bytes - * @throws IllegalArgument exception if given bytes are null or 0 length - */ - public SearchTerm(final byte[] bytes) { - this(bytes, true, null); - } - - /** - * Constructs a search term. Optionally performs a defensive copy of the - * given byte array. If the caller indicates a defensive copy is not - * necessary then they must not change the given arrays state any longer - * - * @param bytes - * @param defensiveCopy - * @param reference - */ - public SearchTerm(final byte[] bytes, final boolean defensiveCopy, final T reference) { - if (bytes == null || bytes.length == 0) { - throw new IllegalArgumentException(); - } - if (defensiveCopy) { - this.bytes = Arrays.copyOf(bytes, bytes.length); - } else { - this.bytes = bytes; - } - this.hashCode = Arrays.hashCode(this.bytes); - this.reference = reference; - } - - public int get(final int index) { - return bytes[index] & 0xff; - } - - /** - * @return size in of search term in bytes - */ - public int size() { - return bytes.length; - } - - /** - * @return reference object for this given search term - */ - public T getReference() { - return reference; - } - - /** - * Determines if the given window starts with the same bytes as this term - * - * @param window Current window of bytes from the haystack being evaluated. - * @param windowLength The length of the window to consider - * @return true if this term starts with the same bytes of the given window - */ - public boolean startsWith(byte[] window, int windowLength) { - if (windowLength > window.length) { - throw new IndexOutOfBoundsException(); - } - if (bytes.length < windowLength) { - return false; - } - for (int i = 0; i < bytes.length && i < windowLength; i++) { - if (bytes[i] != window[i]) { - return false; - } - } - return true; - } - - /** - * @return a defensive copy of the internal byte structure - */ - public byte[] getBytes() { - return Arrays.copyOf(bytes, bytes.length); - } - - @Override - public int hashCode() { - return hashCode; - } - - @Override - public boolean equals(final Object obj) { - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - final SearchTerm other = (SearchTerm) obj; - if (this.hashCode != other.hashCode) { - return false; - } - return Arrays.equals(this.bytes, other.bytes); - } - - @Override - public String toString() { - return new String(bytes); - } - - public String toString(final Charset charset) { - return new String(bytes, charset); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/AhoCorasick.java ---------------------------------------------------------------------- diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/AhoCorasick.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/AhoCorasick.java deleted file mode 100644 index 3b8afaf..0000000 --- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/AhoCorasick.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util.search.ahocorasick; - -import java.io.IOException; -import java.io.InputStream; -import java.util.LinkedList; -import java.util.Queue; -import java.util.Set; - -import org.apache.nifi.util.search.Search; -import org.apache.nifi.util.search.SearchTerm; - -public class AhoCorasick<T> implements Search<T> { - - private Node root = null; - - /** - * Constructs a new search object. - * - * @throws IllegalArgumentException if given terms are null or empty - */ - public AhoCorasick() { - } - - @Override - public void initializeDictionary(final Set<SearchTerm<T>> terms) { - if (root != null) { - throw new IllegalStateException(); - } - root = new Node(); - if (terms == null || terms.isEmpty()) { - throw new IllegalArgumentException(); - } - for (final SearchTerm<T> term : terms) { - int i = 0; - Node nextNode = root; - while (true) { - nextNode = addMatch(term, i, nextNode); - if (nextNode == null) { - break; //we're done - } - i++; - } - } - initialize(); - } - - private Node addMatch(final SearchTerm<T> term, final int offset, final Node current) { - final int index = term.get(offset); - boolean atEnd = (offset == (term.size() - 1)); - if (current.getNeighbor(index) == null) { - if (atEnd) { - current.setNeighbor(new Node(term), index); - return null; - } - current.setNeighbor(new Node(), index); - } else if (atEnd) { - current.getNeighbor(index).setMatchingTerm(term); - return null; - } - return current.getNeighbor(index); - } - - private void initialize() { - //perform bgs to build failure links - final Queue<Node> queue = new LinkedList<>(); - queue.add(root); - root.setFailureNode(null); - while (!queue.isEmpty()) { - final Node current = queue.poll(); - for (int i = 0; i < 256; i++) { - final Node next = current.getNeighbor(i); - if (next != null) { - //traverse failure to get state - Node fail = current.getFailureNode(); - while ((fail != null) && fail.getNeighbor(i) == null) { - fail = fail.getFailureNode(); - } - if (fail != null) { - next.setFailureNode(fail.getNeighbor(i)); - } else { - next.setFailureNode(root); - } - queue.add(next); - } - } - } - } - - @Override - public SearchState search(final InputStream stream, final boolean findAll) throws IOException { - return search(stream, findAll, null); - } - - private SearchState search(final InputStream stream, final boolean findAll, final SearchState state) throws IOException { - if (root == null) { - throw new IllegalStateException(); - } - final SearchState<T> currentState = (state == null) ? new SearchState(root) : state; - if (!findAll && currentState.foundMatch()) { - throw new IllegalStateException("A match has already been found yet we're being asked to keep searching"); - } - Node current = currentState.getCurrentNode(); - int currentChar; - while ((currentChar = stream.read()) >= 0) { - currentState.incrementBytesRead(1L); - Node next = current.getNeighbor(currentChar); - if (next == null) { - next = current.getFailureNode(); - while ((next != null) && next.getNeighbor(currentChar) == null) { - next = next.getFailureNode(); - } - if (next != null) { - next = next.getNeighbor(currentChar); - } else { - next = root; - } - } - if (next == null) { - throw new IllegalStateException("tree out of sync"); - } - //Accept condition - if (next.hasMatch()) { - currentState.addResult(next.getMatchingTerm()); - } - for (Node failNode = next.getFailureNode(); failNode != null; failNode = failNode.getFailureNode()) { - if (failNode.hasMatch()) { - currentState.addResult(failNode.getMatchingTerm()); - } - } - current = next; - if (currentState.foundMatch() && !findAll) { - break;//give up as soon as we have at least one match - } - } - currentState.setCurrentNode(current); - return currentState; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/Node.java ---------------------------------------------------------------------- diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/Node.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/Node.java deleted file mode 100644 index 0ac325c..0000000 --- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/Node.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util.search.ahocorasick; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.nifi.util.search.SearchTerm; - -/** - * - * @author - */ -public class Node { - - private final Map<Integer, Node> neighborMap; - private Node failureNode; - private SearchTerm<?> term; - - Node(final SearchTerm<?> term) { - this(); - this.term = term; - } - - Node() { - neighborMap = new HashMap<>(); - term = null; - } - - void setFailureNode(final Node fail) { - failureNode = fail; - } - - public Node getFailureNode() { - return failureNode; - } - - public boolean hasMatch() { - return term != null; - } - - void setMatchingTerm(final SearchTerm<?> term) { - this.term = term; - } - - public SearchTerm<?> getMatchingTerm() { - return term; - } - - public Node getNeighbor(final int index) { - return neighborMap.get(index); - } - - void setNeighbor(final Node neighbor, final int index) { - neighborMap.put(index, neighbor); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java ---------------------------------------------------------------------- diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java deleted file mode 100644 index 6d36ad0..0000000 --- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util.search.ahocorasick; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.nifi.util.search.SearchTerm; - -public class SearchState<T> { - - private Node currentNode; - private final Map<SearchTerm<T>, List<Long>> resultMap; - private long bytesRead; - - SearchState(final Node rootNode) { - resultMap = new HashMap<>(5); - currentNode = rootNode; - bytesRead = 0L; - } - - void incrementBytesRead(final long increment) { - bytesRead += increment; - } - - void setCurrentNode(final Node curr) { - currentNode = curr; - } - - public Node getCurrentNode() { - return currentNode; - } - - public Map<SearchTerm<T>, List<Long>> getResults() { - return new HashMap<>(resultMap); - } - - void addResult(final SearchTerm matchingTerm) { - final List<Long> indexes = (resultMap.containsKey(matchingTerm)) ? resultMap.get(matchingTerm) : new ArrayList<Long>(5); - indexes.add(bytesRead); - resultMap.put(matchingTerm, indexes); - } - - public boolean foundMatch() { - return !resultMap.isEmpty(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/EntityAccess.java ---------------------------------------------------------------------- diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/EntityAccess.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/EntityAccess.java deleted file mode 100644 index 2b95897..0000000 --- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/EntityAccess.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util.timebuffer; - -public interface EntityAccess<T> { - - T aggregate(T oldValue, T toAdd); - - T createNew(); - - long getTimestamp(T entity); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java ---------------------------------------------------------------------- diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java deleted file mode 100644 index 193abc6..0000000 --- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util.timebuffer; - -public class LongEntityAccess implements EntityAccess<TimestampedLong> { - - @Override - public TimestampedLong aggregate(TimestampedLong oldValue, TimestampedLong toAdd) { - if (oldValue == null && toAdd == null) { - return new TimestampedLong(0L); - } else if (oldValue == null) { - return toAdd; - } else if (toAdd == null) { - return oldValue; - } - - return new TimestampedLong(oldValue.getValue() + toAdd.getValue()); - } - - @Override - public TimestampedLong createNew() { - return new TimestampedLong(0L); - } - - @Override - public long getTimestamp(TimestampedLong entity) { - return entity == null ? 0L : entity.getTimestamp(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java ---------------------------------------------------------------------- diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java deleted file mode 100644 index dd8e523..0000000 --- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util.timebuffer; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -public class TimedBuffer<T> { - - private final int numBins; - private final EntitySum<T>[] bins; - private final EntityAccess<T> entityAccess; - private final TimeUnit binPrecision; - - @SuppressWarnings("unchecked") - public TimedBuffer(final TimeUnit binPrecision, final int numBins, final EntityAccess<T> accessor) { - this.binPrecision = binPrecision; - this.numBins = numBins + 1; - this.bins = new EntitySum[this.numBins]; - for (int i = 0; i < this.numBins; i++) { - this.bins[i] = new EntitySum<>(binPrecision, numBins, accessor); - } - this.entityAccess = accessor; - } - - public T add(final T entity) { - final int binIdx = (int) (binPrecision.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS) % numBins); - final EntitySum<T> sum = bins[binIdx]; - - return sum.addOrReset(entity); - } - - public T getAggregateValue(final long sinceEpochMillis) { - final int startBinIdx = (int) (binPrecision.convert(sinceEpochMillis, TimeUnit.MILLISECONDS) % numBins); - - T total = null; - for (int i = 0; i < numBins; i++) { - int binIdx = (startBinIdx + i) % numBins; - final EntitySum<T> bin = bins[binIdx]; - - if (!bin.isExpired()) { - total = entityAccess.aggregate(total, bin.getValue()); - } - } - - return total; - } - - private static class EntitySum<S> { - - private final EntityAccess<S> entityAccess; - private final AtomicReference<S> ref = new AtomicReference<>(); - private final TimeUnit binPrecision; - private final int numConfiguredBins; - - public EntitySum(final TimeUnit binPrecision, final int numConfiguredBins, final EntityAccess<S> aggregator) { - this.binPrecision = binPrecision; - this.entityAccess = aggregator; - this.numConfiguredBins = numConfiguredBins; - } - - private S add(final S event) { - S newValue; - S value; - do { - value = ref.get(); - newValue = entityAccess.aggregate(value, event); - } while (!ref.compareAndSet(value, newValue)); - - return newValue; - } - - public S getValue() { - return ref.get(); - } - - public boolean isExpired() { - // entityAccess.getTimestamp(curValue) represents the time at which the current value - // was last updated. If the last value is less than current time - 1 binPrecision, then it - // means that we've rolled over and need to reset the value. - final long maxExpectedTimePeriod = System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(numConfiguredBins, binPrecision); - - final S curValue = ref.get(); - return (entityAccess.getTimestamp(curValue) < maxExpectedTimePeriod); - } - - public S addOrReset(final S event) { - // entityAccess.getTimestamp(curValue) represents the time at which the current value - // was last updated. If the last value is less than current time - 1 binPrecision, then it - // means that we've rolled over and need to reset the value. - final long maxExpectedTimePeriod = System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(1, binPrecision); - - final S curValue = ref.get(); - if (entityAccess.getTimestamp(curValue) < maxExpectedTimePeriod) { - ref.compareAndSet(curValue, entityAccess.createNew()); - } - return add(event); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java ---------------------------------------------------------------------- diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java deleted file mode 100644 index 07d31ea..0000000 --- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util.timebuffer; - -public class TimestampedLong { - - private final Long value; - private final long timestamp = System.currentTimeMillis(); - - public TimestampedLong(final Long value) { - this.value = value; - } - - public Long getValue() { - return value; - } - - public long getTimestamp() { - return timestamp; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java ---------------------------------------------------------------------- diff --git a/nifi/commons/nifi-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java b/nifi/commons/nifi-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java deleted file mode 100644 index bd30a96..0000000 --- a/nifi/commons/nifi-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.io; - -import static org.junit.Assert.assertTrue; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.Arrays; - -import org.apache.nifi.remote.io.CompressionInputStream; -import org.apache.nifi.remote.io.CompressionOutputStream; - -import org.junit.Test; - -public class TestCompressionInputOutputStreams { - - @Test - public void testSimple() throws IOException { - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - - final byte[] data = "Hello, World!".getBytes("UTF-8"); - - final CompressionOutputStream cos = new CompressionOutputStream(baos); - cos.write(data); - cos.flush(); - cos.close(); - - final byte[] compressedBytes = baos.toByteArray(); - final CompressionInputStream cis = new CompressionInputStream(new ByteArrayInputStream(compressedBytes)); - final byte[] decompressed = readFully(cis); - - assertTrue(Arrays.equals(data, decompressed)); - } - - @Test - public void testDataLargerThanBuffer() throws IOException { - final String str = "The quick brown fox jumps over the lazy dog\r\n\n\n\r"; - - final StringBuilder sb = new StringBuilder(); - for (int i = 0; i < 100; i++) { - sb.append(str); - } - final byte[] data = sb.toString().getBytes("UTF-8"); - - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - - final CompressionOutputStream cos = new CompressionOutputStream(baos, 8192); - cos.write(data); - cos.flush(); - cos.close(); - - final byte[] compressedBytes = baos.toByteArray(); - final CompressionInputStream cis = new CompressionInputStream(new ByteArrayInputStream(compressedBytes)); - final byte[] decompressed = readFully(cis); - - assertTrue(Arrays.equals(data, decompressed)); - } - - @Test - public void testDataLargerThanBufferWhileFlushing() throws IOException { - final String str = "The quick brown fox jumps over the lazy dog\r\n\n\n\r"; - final byte[] data = str.getBytes("UTF-8"); - - final StringBuilder sb = new StringBuilder(); - final byte[] data1024; - - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - - final CompressionOutputStream cos = new CompressionOutputStream(baos, 8192); - for (int i = 0; i < 1024; i++) { - cos.write(data); - cos.flush(); - sb.append(str); - } - cos.close(); - data1024 = sb.toString().getBytes("UTF-8"); - - final byte[] compressedBytes = baos.toByteArray(); - final CompressionInputStream cis = new CompressionInputStream(new ByteArrayInputStream(compressedBytes)); - final byte[] decompressed = readFully(cis); - - assertTrue(Arrays.equals(data1024, decompressed)); - } - - @Test - public void testSendingMultipleFilesBackToBackOnSameStream() throws IOException { - final String str = "The quick brown fox jumps over the lazy dog\r\n\n\n\r"; - final byte[] data = str.getBytes("UTF-8"); - - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - - final CompressionOutputStream cos = new CompressionOutputStream(baos, 8192); - for (int i = 0; i < 512; i++) { - cos.write(data); - cos.flush(); - } - cos.close(); - - final CompressionOutputStream cos2 = new CompressionOutputStream(baos, 8192); - for (int i = 0; i < 512; i++) { - cos2.write(data); - cos2.flush(); - } - cos2.close(); - - final byte[] data512; - final StringBuilder sb = new StringBuilder(); - for (int i = 0; i < 512; i++) { - sb.append(str); - } - data512 = sb.toString().getBytes("UTF-8"); - - final byte[] compressedBytes = baos.toByteArray(); - final ByteArrayInputStream bais = new ByteArrayInputStream(compressedBytes); - - final CompressionInputStream cis = new CompressionInputStream(bais); - final byte[] decompressed = readFully(cis); - assertTrue(Arrays.equals(data512, decompressed)); - - final CompressionInputStream cis2 = new CompressionInputStream(bais); - final byte[] decompressed2 = readFully(cis2); - assertTrue(Arrays.equals(data512, decompressed2)); - } - - private byte[] readFully(final InputStream in) throws IOException { - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - - final byte[] buffer = new byte[65536]; - int len; - while ((len = in.read(buffer)) >= 0) { - baos.write(buffer, 0, len); - } - - return baos.toByteArray(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java ---------------------------------------------------------------------- diff --git a/nifi/commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java b/nifi/commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java deleted file mode 100644 index 52bd8de..0000000 --- a/nifi/commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.stream.io; - -import org.apache.nifi.stream.io.ByteArrayInputStream; -import org.apache.nifi.stream.io.ByteArrayOutputStream; -import org.apache.nifi.stream.io.LeakyBucketStreamThrottler; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.List; - -import org.junit.Ignore; -import org.junit.Test; - -@Ignore("Tests are time-based") -public class TestLeakyBucketThrottler { - - @Test(timeout = 10000) - public void testOutputStreamInterface() throws IOException { - // throttle rate at 1 MB/sec - final LeakyBucketStreamThrottler throttler = new LeakyBucketStreamThrottler(1024 * 1024); - - final byte[] data = new byte[1024 * 1024 * 4]; - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - final OutputStream throttledOut = throttler.newThrottledOutputStream(baos); - - final long start = System.currentTimeMillis(); - throttledOut.write(data); - throttler.close(); - final long millis = System.currentTimeMillis() - start; - // should take 4 sec give or take - assertTrue(millis > 3000); - assertTrue(millis < 6000); - } - - @Test(timeout = 10000) - public void testInputStreamInterface() throws IOException { - // throttle rate at 1 MB/sec - final LeakyBucketStreamThrottler throttler = new LeakyBucketStreamThrottler(1024 * 1024); - - final byte[] data = new byte[1024 * 1024 * 4]; - final ByteArrayInputStream bais = new ByteArrayInputStream(data); - final InputStream throttledIn = throttler.newThrottledInputStream(bais); - - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - - final byte[] buffer = new byte[4096]; - final long start = System.currentTimeMillis(); - int len; - while ((len = throttledIn.read(buffer)) > 0) { - baos.write(buffer, 0, len); - } - throttler.close(); - final long millis = System.currentTimeMillis() - start; - // should take 4 sec give or take - assertTrue(millis > 3000); - assertTrue(millis < 6000); - baos.close(); - } - - @Test(timeout = 10000) - public void testDirectInterface() throws IOException, InterruptedException { - // throttle rate at 1 MB/sec - final LeakyBucketStreamThrottler throttler = new LeakyBucketStreamThrottler(1024 * 1024); - - // create 3 threads, each sending ~2 MB - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - final List<Thread> threads = new ArrayList<Thread>(); - for (int i = 0; i < 3; i++) { - final Thread t = new WriterThread(i, throttler, baos); - threads.add(t); - } - - final long start = System.currentTimeMillis(); - for (final Thread t : threads) { - t.start(); - } - - for (final Thread t : threads) { - t.join(); - } - final long elapsed = System.currentTimeMillis() - start; - - throttler.close(); - - // To send 15 MB, it should have taken at least 5 seconds and no more than 7 seconds, to - // allow for busy-ness and the fact that we could write a tiny bit more than the limit. - assertTrue(elapsed > 5000); - assertTrue(elapsed < 7000); - - // ensure bytes were copied out appropriately - assertEquals(3 * (2 * 1024 * 1024 + 1), baos.getBufferLength()); - assertEquals((byte) 'A', baos.getUnderlyingBuffer()[baos.getBufferLength() - 1]); - } - - private static class WriterThread extends Thread { - - private final int idx; - private final byte[] data = new byte[1024 * 1024 * 2 + 1]; - private final LeakyBucketStreamThrottler throttler; - private final OutputStream out; - - public WriterThread(final int idx, final LeakyBucketStreamThrottler throttler, final OutputStream out) { - this.idx = idx; - this.throttler = throttler; - this.out = out; - this.data[this.data.length - 1] = (byte) 'A'; - } - - @Override - public void run() { - long startMillis = System.currentTimeMillis(); - long bytesWritten = 0L; - try { - throttler.copy(new ByteArrayInputStream(data), out); - } catch (IOException e) { - e.printStackTrace(); - return; - } - long now = System.currentTimeMillis(); - long millisElapsed = now - startMillis; - bytesWritten += data.length; - float bytesPerSec = (float) bytesWritten / (float) millisElapsed * 1000F; - System.out.println(idx + " : copied data at a rate of " + bytesPerSec + " bytes/sec"); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/test/java/org/apache/nifi/util/TestNaiveSearchRingBuffer.java ---------------------------------------------------------------------- diff --git a/nifi/commons/nifi-utils/src/test/java/org/apache/nifi/util/TestNaiveSearchRingBuffer.java b/nifi/commons/nifi-utils/src/test/java/org/apache/nifi/util/TestNaiveSearchRingBuffer.java deleted file mode 100644 index 0838e96..0000000 --- a/nifi/commons/nifi-utils/src/test/java/org/apache/nifi/util/TestNaiveSearchRingBuffer.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import org.junit.Test; - -public class TestNaiveSearchRingBuffer { - - @Test - public void testAddAndCompare() { - final byte[] pattern = new byte[]{ - '\r', '0', 38, 48 - }; - - final byte[] search = new byte[]{ - '\r', '0', 38, 58, 58, 83, 78, '\r', '0', 38, 48, 83, 92, 78, 4, 38 - }; - - final NaiveSearchRingBuffer circ = new NaiveSearchRingBuffer(pattern); - int counter = -1; - for (final byte b : search) { - counter++; - final boolean matched = circ.addAndCompare(b); - if (counter == 10) { - assertTrue(matched); - } else { - assertFalse(matched); - } - } - } - - @Test - public void testGetOldestByte() { - final byte[] pattern = new byte[]{ - '\r', '0', 38, 48 - }; - - final byte[] search = new byte[]{ - '\r', '0', 38, 58, 58, 83, 78, (byte) 223, (byte) 227, (byte) 250, '\r', '0', 38, 48, 83, 92, 78, 4, 38 - }; - - final NaiveSearchRingBuffer circ = new NaiveSearchRingBuffer(pattern); - int counter = -1; - for (final byte b : search) { - counter++; - final boolean matched = circ.addAndCompare(b); - if (counter == 13) { - assertTrue(matched); - } else { - assertFalse(matched); - } - } - } - -}