http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-file-utils/src/main/java/org/apache/nifi/file/FileUtils.java ---------------------------------------------------------------------- diff --git a/commons/nifi-file-utils/src/main/java/org/apache/nifi/file/FileUtils.java b/commons/nifi-file-utils/src/main/java/org/apache/nifi/file/FileUtils.java new file mode 100644 index 0000000..8920493 --- /dev/null +++ b/commons/nifi-file-utils/src/main/java/org/apache/nifi/file/FileUtils.java @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.file; + +import java.io.BufferedInputStream; +import java.io.Closeable; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.FilenameFilter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Random; +import org.apache.commons.codec.digest.DigestUtils; + +import org.slf4j.Logger; + +/** + * A utility class containing a few useful static methods to do typical IO + * operations. + * + * @author unattributed + */ +public class FileUtils { + + public static final long TRANSFER_CHUNK_SIZE_BYTES = 1024 * 1024 * 8; //8 MB chunks + public static final long MILLIS_BETWEEN_ATTEMPTS = 50L; + + /** + * Closes the given closeable quietly - no logging, no exceptions... + * + * @param closeable + */ + public static void closeQuietly(final Closeable closeable) { + if (null != closeable) { + try { + closeable.close(); + } catch (final IOException io) {/*IGNORE*/ + + } + } + } + + /** + * Releases the given lock quietly - no logging, no exception + * + * @param lock + */ + public static void releaseQuietly(final FileLock lock) { + if (null != lock) { + try { + lock.release(); + } catch (final IOException io) { + /*IGNORE*/ + } + } + } + + public static void ensureDirectoryExistAndCanAccess(final File dir) throws IOException { + if (dir.exists() && !dir.isDirectory()) { + throw new IOException(dir.getAbsolutePath() + " is not a directory"); + } else if (!dir.exists()) { + final boolean made = dir.mkdirs(); + if (!made) { + throw new IOException(dir.getAbsolutePath() + " could not be created"); + } + } + if (!(dir.canRead() && dir.canWrite())) { + throw new IOException(dir.getAbsolutePath() + " directory does not have read/write privilege"); + } + } + + /** + * Deletes the given file. If the given file exists but could not be deleted + * this will be printed as a warning to the given logger + * + * @param file + * @param logger + * @return + */ + public static boolean deleteFile(final File file, final Logger logger) { + return FileUtils.deleteFile(file, logger, 1); + } + + /** + * Deletes the given file. If the given file exists but could not be deleted + * this will be printed as a warning to the given logger + * + * @param file + * @param logger + * @param attempts indicates how many times an attempt to delete should be + * made + * @return true if given file no longer exists + */ + public static boolean deleteFile(final File file, final Logger logger, final int attempts) { + if(file == null){ + return false; + } + boolean isGone = false; + try { + if (file.exists()) { + final int effectiveAttempts = Math.max(1, attempts); + for (int i = 0; i < effectiveAttempts && !isGone; i++) { + isGone = file.delete() || !file.exists(); + if (!isGone && (effectiveAttempts - i) > 1) { + FileUtils.sleepQuietly(MILLIS_BETWEEN_ATTEMPTS); + } + } + if (!isGone && logger != null) { + logger.warn("File appears to exist but unable to delete file: " + file.getAbsolutePath()); + } + } + } catch (final Throwable t) { + if (logger != null) { + logger.warn("Unable to delete file: '" + file.getAbsolutePath() + "' due to " + t); + } + } + return isGone; + } + + /** + * Deletes all of the given files. If any exist and cannot be deleted that + * will be printed at warn to the given logger. + * + * @param files can be null + * @param logger can be null + */ + public static void deleteFile(final List<File> files, final Logger logger) { + FileUtils.deleteFile(files, logger, 1); + } + + /** + * Deletes all of the given files. If any exist and cannot be deleted that + * will be printed at warn to the given logger. + * + * @param files can be null + * @param logger can be null + * @param attempts indicates how many times an attempt should be made to + * delete each file + */ + public static void deleteFile(final List<File> files, final Logger logger, final int attempts) { + if (null == files || files.isEmpty()) { + return; + } + final int effectiveAttempts = Math.max(1, attempts); + for (final File file : files) { + try { + boolean isGone = false; + for (int i = 0; i < effectiveAttempts && !isGone; i++) { + isGone = file.delete() || !file.exists(); + if (!isGone && (effectiveAttempts - i) > 1) { + FileUtils.sleepQuietly(MILLIS_BETWEEN_ATTEMPTS); + } + } + if (!isGone && logger != null) { + logger.warn("File appears to exist but unable to delete file: " + file.getAbsolutePath()); + } + } catch (final Throwable t) { + if (null != logger) { + logger.warn("Unable to delete file given from path: '" + file.getPath() + "' due to " + t); + } + } + } + } + + /** + * Deletes all files (not directories..) in the given directory (non + * recursive) that match the given filename filter. If any file cannot be + * deleted then this is printed at warn to the given logger. + * + * @param directory + * @param filter if null then no filter is used + * @param logger + */ + public static void deleteFilesInDir(final File directory, final FilenameFilter filter, final Logger logger) { + FileUtils.deleteFilesInDir(directory, filter, logger, false); + } + + /** + * Deletes all files (not directories) in the given directory (recursive) + * that match the given filename filter. If any file cannot be deleted then + * this is printed at warn to the given logger. + * + * @param directory + * @param filter if null then no filter is used + * @param logger + * @param recurse + */ + public static void deleteFilesInDir(final File directory, final FilenameFilter filter, final Logger logger, final boolean recurse) { + FileUtils.deleteFilesInDir(directory, filter, logger, recurse, false); + } + + /** + * Deletes all files (not directories) in the given directory (recursive) + * that match the given filename filter. If any file cannot be deleted then + * this is printed at warn to the given logger. + * + * @param directory + * @param filter if null then no filter is used + * @param logger + * @param recurse + * @param deleteEmptyDirectories default is false; if true will delete + * directories found that are empty + */ + public static void deleteFilesInDir(final File directory, final FilenameFilter filter, final Logger logger, final boolean recurse, final boolean deleteEmptyDirectories) { + // ensure the specified directory is actually a directory and that it exists + if (null != directory && directory.isDirectory()) { + final File ingestFiles[] = directory.listFiles(); + for (File ingestFile : ingestFiles) { + boolean process = (filter == null) ? true : filter.accept(directory, ingestFile.getName()); + if (ingestFile.isFile() && process) { + FileUtils.deleteFile(ingestFile, logger, 3); + } + if (ingestFile.isDirectory() && recurse) { + FileUtils.deleteFilesInDir(ingestFile, filter, logger, recurse, deleteEmptyDirectories); + if (deleteEmptyDirectories && ingestFile.list().length == 0) { + FileUtils.deleteFile(ingestFile, logger, 3); + } + } + } + } + } + + /** + * Deletes given files. + * + * @param files + * @param recurse will recurse + * @throws IOException + */ + public static void deleteFiles(final Collection<File> files, final boolean recurse) throws IOException { + for (final File file : files) { + FileUtils.deleteFile(file, recurse); + } + } + + public static void deleteFile(final File file, final boolean recurse) throws IOException { + if (file.isDirectory() && recurse) { + FileUtils.deleteFiles(Arrays.asList(file.listFiles()), recurse); + } + //now delete the file itself regardless of whether it is plain file or a directory + if (!FileUtils.deleteFile(file, null, 5)) { + throw new IOException("Unable to delete " + file.getAbsolutePath()); + } + } + + /** + * Randomly generates a sequence of bytes and overwrites the contents of the + * file a number of times. The file is then deleted. + * + * @param file File to be overwritten a number of times and, ultimately, + * deleted + * @param passes Number of times file should be overwritten + * @throws IOException if something makes shredding or deleting a problem + */ + public static void shredFile(final File file, final int passes) + throws IOException { + final Random generator = new Random(); + final long fileLength = file.length(); + final int byteArraySize = (int) Math.min(fileLength, 1048576); // 1MB + final byte[] b = new byte[byteArraySize]; + final long numOfRandomWrites = (fileLength / b.length) + 1; + final FileOutputStream fos = new FileOutputStream(file); + try { + // Over write file contents (passes) times + final FileChannel channel = fos.getChannel(); + for (int i = 0; i < passes; i++) { + generator.nextBytes(b); + for (int j = 0; j <= numOfRandomWrites; j++) { + fos.write(b); + } + fos.flush(); + channel.position(0); + } + // Write out "0" for each byte in the file + Arrays.fill(b, (byte) 0); + for (int j = 0; j < numOfRandomWrites; j++) { + fos.write(b); + } + fos.flush(); + fos.close(); + // Try to delete the file a few times + if (!FileUtils.deleteFile(file, null, 5)) { + throw new IOException("Failed to delete file after shredding"); + } + + } finally { + FileUtils.closeQuietly(fos); + } + } + + public static long copy(final InputStream in, final OutputStream out) throws IOException { + final byte[] buffer = new byte[65536]; + long copied = 0L; + int len; + while ((len = in.read(buffer)) > 0) { + out.write(buffer, 0, len); + copied += len; + } + + return copied; + } + + public static long copyBytes(final byte[] bytes, final File destination, final boolean lockOutputFile) throws FileNotFoundException, IOException { + FileOutputStream fos = null; + FileLock outLock = null; + long fileSize = 0L; + try { + fos = new FileOutputStream(destination); + final FileChannel out = fos.getChannel(); + if (lockOutputFile) { + outLock = out.tryLock(0, Long.MAX_VALUE, false); + if (null == outLock) { + throw new IOException("Unable to obtain exclusive file lock for: " + destination.getAbsolutePath()); + } + } + fos.write(bytes); + fos.flush(); + fileSize = bytes.length; + } finally { + FileUtils.releaseQuietly(outLock); + FileUtils.closeQuietly(fos); + } + return fileSize; + } + + /** + * Copies the given source file to the given destination file. The given + * destination will be overwritten if it already exists. + * + * @param source + * @param destination + * @param lockInputFile if true will lock input file during copy; if false + * will not + * @param lockOutputFile if true will lock output file during copy; if false + * will not + * @param move if true will perform what is effectively a move operation + * rather than a pure copy. This allows for potentially highly efficient + * movement of the file but if not possible this will revert to a copy then + * delete behavior. If false, then the file is copied and the source file is + * retained. If a true rename/move occurs then no lock is held during that + * time. + * @param logger if failures occur, they will be logged to this logger if + * possible. If this logger is null, an IOException will instead be thrown, + * indicating the problem. + * @return long number of bytes copied + * @throws FileNotFoundException if the source file could not be found + * @throws IOException + * @throws SecurityException if a security manager denies the needed file + * operations + */ + public static long copyFile(final File source, final File destination, final boolean lockInputFile, final boolean lockOutputFile, final boolean move, final Logger logger) throws FileNotFoundException, IOException { + + FileInputStream fis = null; + FileOutputStream fos = null; + FileLock inLock = null; + FileLock outLock = null; + long fileSize = 0L; + if (!source.canRead()) { + throw new IOException("Must at least have read permission"); + + } + if (move && source.renameTo(destination)) { + fileSize = destination.length(); + } else { + try { + fis = new FileInputStream(source); + fos = new FileOutputStream(destination); + final FileChannel in = fis.getChannel(); + final FileChannel out = fos.getChannel(); + if (lockInputFile) { + inLock = in.tryLock(0, Long.MAX_VALUE, true); + if (null == inLock) { + throw new IOException("Unable to obtain shared file lock for: " + source.getAbsolutePath()); + } + } + if (lockOutputFile) { + outLock = out.tryLock(0, Long.MAX_VALUE, false); + if (null == outLock) { + throw new IOException("Unable to obtain exclusive file lock for: " + destination.getAbsolutePath()); + } + } + long bytesWritten = 0; + do { + bytesWritten += out.transferFrom(in, bytesWritten, TRANSFER_CHUNK_SIZE_BYTES); + fileSize = in.size(); + } while (bytesWritten < fileSize); + out.force(false); + FileUtils.closeQuietly(fos); + FileUtils.closeQuietly(fis); + fos = null; + fis = null; + if (move && !FileUtils.deleteFile(source, null, 5)) { + if (logger == null) { + FileUtils.deleteFile(destination, null, 5); + throw new IOException("Could not remove file " + source.getAbsolutePath()); + } else { + logger.warn("Configured to delete source file when renaming/move not successful. However, unable to delete file at: " + source.getAbsolutePath()); + } + } + } finally { + FileUtils.releaseQuietly(inLock); + FileUtils.releaseQuietly(outLock); + FileUtils.closeQuietly(fos); + FileUtils.closeQuietly(fis); + } + } + return fileSize; + } + + /** + * Copies the given source file to the given destination file. The given + * destination will be overwritten if it already exists. + * + * @param source + * @param destination + * @param lockInputFile if true will lock input file during copy; if false + * will not + * @param lockOutputFile if true will lock output file during copy; if false + * will not + * @param logger + * @return long number of bytes copied + * @throws FileNotFoundException if the source file could not be found + * @throws IOException + * @throws SecurityException if a security manager denies the needed file + * operations + */ + public static long copyFile(final File source, final File destination, final boolean lockInputFile, final boolean lockOutputFile, final Logger logger) throws FileNotFoundException, IOException { + return FileUtils.copyFile(source, destination, lockInputFile, lockOutputFile, false, logger); + } + + public static long copyFile(final File source, final OutputStream stream, final boolean closeOutputStream, final boolean lockInputFile) throws FileNotFoundException, IOException { + FileInputStream fis = null; + FileLock inLock = null; + long fileSize = 0L; + try { + fis = new FileInputStream(source); + final FileChannel in = fis.getChannel(); + if (lockInputFile) { + inLock = in.tryLock(0, Long.MAX_VALUE, true); + if (inLock == null) { + throw new IOException("Unable to obtain exclusive file lock for: " + source.getAbsolutePath()); + } + + } + + byte[] buffer = new byte[1 << 18]; //256 KB + int bytesRead = -1; + while ((bytesRead = fis.read(buffer)) != -1) { + stream.write(buffer, 0, bytesRead); + } + in.force(false); + stream.flush(); + fileSize = in.size(); + } finally { + FileUtils.releaseQuietly(inLock); + FileUtils.closeQuietly(fis); + if (closeOutputStream) { + FileUtils.closeQuietly(stream); + } + } + return fileSize; + } + + public static long copyFile(final InputStream stream, final File destination, final boolean closeInputStream, final boolean lockOutputFile) throws FileNotFoundException, IOException { + final Path destPath = destination.toPath(); + final long size = Files.copy(stream, destPath); + if (closeInputStream) { + stream.close(); + } + return size; + } + + /** + * Renames the given file from the source path to the destination path. This + * handles multiple attempts. This should only be used to rename within a + * given directory. Renaming across directories might not work well. See the + * <code>File.renameTo</code> for more information. + * + * @param source the file to rename + * @param destination the file path to rename to + * @param maxAttempts the max number of attempts to attempt the rename + * @throws IOException if rename isn't successful + */ + public static void renameFile(final File source, final File destination, final int maxAttempts) throws IOException { + FileUtils.renameFile(source, destination, maxAttempts, false); + } + + /** + * Renames the given file from the source path to the destination path. This + * handles multiple attempts. This should only be used to rename within a + * given directory. Renaming across directories might not work well. See the + * <code>File.renameTo</code> for more information. + * + * @param source the file to rename + * @param destination the file path to rename to + * @param maxAttempts the max number of attempts to attempt the rename + * @param replace if true and a rename attempt fails will check if a file is + * already at the destination path. If so it will delete that file and + * attempt the rename according the remaining maxAttempts. If false, any + * conflicting files will be left as they were and the rename attempts will + * fail if conflicting. + * @throws IOException if rename isn't successful + */ + public static void renameFile(final File source, final File destination, final int maxAttempts, final boolean replace) throws IOException { + final int attempts = (replace || maxAttempts < 1) ? Math.max(2, maxAttempts) : maxAttempts; + boolean renamed = false; + for (int i = 0; i < attempts; i++) { + renamed = source.renameTo(destination); + if (!renamed) { + FileUtils.deleteFile(destination, null, 5); + } else { + break; //rename has succeeded + } + } + if (!renamed) { + throw new IOException("Attempted " + maxAttempts + " times but unable to rename from \'" + source.getPath() + "\' to \'" + destination.getPath() + "\'"); + + } + } + + public static void sleepQuietly(final long millis) { + try { + Thread.sleep(millis); + } catch (final InterruptedException ex) { + /* do nothing */ + } + } + + /** + * Syncs a primary copy of a file with the copy in the restore directory. If + * the restore directory does not have a file and the primary has a file, + * the the primary's file is copied to the restore directory. Else if the + * restore directory has a file, but the primary does not, then the + * restore's file is copied to the primary directory. Else if the primary + * file is different than the restore file, then an IllegalStateException is + * thrown. Otherwise, if neither file exists, then no syncing is performed. + * + * @param primaryFile the primary file + * @param restoreFile the restore file + * @param logger a logger + * @throws IOException if an I/O problem was encountered during syncing + * @throws IllegalStateException if the primary and restore copies exist but + * are different + */ + public static void syncWithRestore(final File primaryFile, final File restoreFile, final Logger logger) + throws IOException { + + if (primaryFile.exists() && !restoreFile.exists()) { + // copy primary file to restore + copyFile(primaryFile, restoreFile, false, false, logger); + } else if (restoreFile.exists() && !primaryFile.exists()) { + // copy restore file to primary + copyFile(restoreFile, primaryFile, false, false, logger); + } else if (primaryFile.exists() && restoreFile.exists() && !isSame(primaryFile, restoreFile)) { + throw new IllegalStateException(String.format("Primary file '%s' is different than restore file '%s'", + primaryFile.getAbsoluteFile(), restoreFile.getAbsolutePath())); + } + } + + /** + * Returns true if the given files are the same according to their MD5 hash. + * + * @param file1 a file + * @param file2 a file + * @return true if the files are the same; false otherwise + * @throws IOException if the MD5 hash could not be computed + */ + public static boolean isSame(final File file1, final File file2) throws IOException { + return Arrays.equals(computeMd5Digest(file1), computeMd5Digest(file2)); + } + + /** + * Returns the MD5 hash of the given file. + * + * @param file a file + * @return the MD5 hash + * @throws IOException if the MD5 hash could not be computed + */ + public static byte[] computeMd5Digest(final File file) throws IOException { + BufferedInputStream bis = null; + try { + bis = new BufferedInputStream(new FileInputStream(file)); + return DigestUtils.md5(bis); + } finally { + FileUtils.closeQuietly(bis); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-logging-utils/pom.xml ---------------------------------------------------------------------- diff --git a/commons/nifi-logging-utils/pom.xml b/commons/nifi-logging-utils/pom.xml new file mode 100644 index 0000000..ce5064b --- /dev/null +++ b/commons/nifi-logging-utils/pom.xml @@ -0,0 +1,35 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-parent</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + + <artifactId>nifi-logging-utils</artifactId> + <version>0.0.1-SNAPSHOT</version> + <name>NiFi Logging Utils</name> + <description>Utilities for logging</description> + + <dependencies> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-logging-utils/src/main/java/org/apache/nifi/logging/NiFiLog.java ---------------------------------------------------------------------- diff --git a/commons/nifi-logging-utils/src/main/java/org/apache/nifi/logging/NiFiLog.java b/commons/nifi-logging-utils/src/main/java/org/apache/nifi/logging/NiFiLog.java new file mode 100644 index 0000000..7c71d85 --- /dev/null +++ b/commons/nifi-logging-utils/src/main/java/org/apache/nifi/logging/NiFiLog.java @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.logging; + +import org.slf4j.Logger; +import org.slf4j.Marker; + +/** + * + * @author unattributed + */ +public class NiFiLog implements Logger { + + private final Logger logger; + + public NiFiLog(final Logger logger) { + this.logger = logger; + } + + public Logger getWrappedLog() { + return logger; + } + + @Override + public void warn(Marker marker, String string, Throwable thrwbl) { + if (logger.isDebugEnabled()) { + logger.warn(marker, string, thrwbl); + } else { + logger.warn(marker, string); + } + } + + @Override + public void warn(Marker marker, String string, Object[] os) { + logger.warn(marker, string, os); + } + + @Override + public void warn(Marker marker, String string, Object o, Object o1) { + logger.warn(marker, string, o, o1); + } + + @Override + public void warn(Marker marker, String string, Object o) { + logger.warn(marker, string, o); + } + + @Override + public void warn(Marker marker, String string) { + logger.warn(marker, string); + } + + @Override + public void warn(String string, Throwable thrwbl) { + if (logger.isDebugEnabled()) { + logger.warn(string, thrwbl); + } else { + logger.warn(string); + } + } + + @Override + public void warn(String string, Object o, Object o1) { + logger.warn(string, o, o1); + } + + @Override + public void warn(String string, Object[] os) { + logger.warn(string, os); + } + + @Override + public void warn(String string, Object o) { + logger.warn(string, o); + } + + @Override + public void warn(String string) { + logger.warn(string); + } + + @Override + public void trace(Marker marker, String string, Throwable thrwbl) { + logger.trace(marker, string, thrwbl); + } + + @Override + public void trace(Marker marker, String string, Object[] os) { + logger.trace(marker, string, os); + } + + @Override + public void trace(Marker marker, String string, Object o, Object o1) { + logger.trace(marker, string, o, o1); + } + + @Override + public void trace(Marker marker, String string, Object o) { + logger.trace(marker, string, o); + } + + @Override + public void trace(Marker marker, String string) { + logger.trace(marker, string); + } + + @Override + public void trace(String string, Throwable thrwbl) { + logger.trace(string, thrwbl); + } + + @Override + public void trace(String string, Object[] os) { + logger.trace(string, os); + } + + @Override + public void trace(String string, Object o, Object o1) { + logger.trace(string, o, o1); + } + + @Override + public void trace(String string, Object o) { + logger.trace(string, o); + } + + @Override + public void trace(String string) { + logger.trace(string); + } + + @Override + public boolean isWarnEnabled(Marker marker) { + return logger.isWarnEnabled(marker); + } + + @Override + public boolean isWarnEnabled() { + return logger.isWarnEnabled(); + } + + @Override + public boolean isTraceEnabled(Marker marker) { + return logger.isTraceEnabled(marker); + } + + @Override + public boolean isTraceEnabled() { + return logger.isTraceEnabled(); + } + + @Override + public boolean isInfoEnabled(Marker marker) { + return logger.isInfoEnabled(marker); + } + + @Override + public boolean isInfoEnabled() { + return logger.isInfoEnabled(); + } + + @Override + public boolean isErrorEnabled(Marker marker) { + return logger.isErrorEnabled(marker); + } + + @Override + public boolean isErrorEnabled() { + return logger.isErrorEnabled(); + } + + @Override + public boolean isDebugEnabled(Marker marker) { + return logger.isDebugEnabled(marker); + } + + @Override + public boolean isDebugEnabled() { + return logger.isDebugEnabled(); + } + + @Override + public void info(Marker marker, String string, Throwable thrwbl) { + if (logger.isDebugEnabled()) { + logger.info(marker, string, thrwbl); + } else { + logger.info(marker, string); + } + } + + @Override + public void info(Marker marker, String string, Object[] os) { + logger.info(marker, string, os); + } + + @Override + public void info(Marker marker, String string, Object o, Object o1) { + logger.info(marker, string, o, o1); + } + + @Override + public void info(Marker marker, String string, Object o) { + logger.info(marker, string, o); + } + + @Override + public void info(Marker marker, String string) { + logger.info(marker, string); + } + + @Override + public void info(String string, Throwable thrwbl) { + if (logger.isDebugEnabled()) { + logger.info(string, thrwbl); + } else { + logger.info(string); + } + } + + @Override + public void info(String string, Object[] os) { + logger.info(string, os); + } + + @Override + public void info(String string, Object o, Object o1) { + logger.info(string, o, o1); + } + + @Override + public void info(String string, Object o) { + logger.info(string, o); + } + + @Override + public void info(String string) { + logger.info(string); + } + + @Override + public String getName() { + return logger.getName(); + } + + @Override + public void error(Marker marker, String string, Throwable thrwbl) { + if (logger.isDebugEnabled()) { + logger.error(marker, string, thrwbl); + } else { + logger.error(marker, string); + } + } + + @Override + public void error(Marker marker, String string, Object[] os) { + logger.error(marker, string, os); + } + + @Override + public void error(Marker marker, String string, Object o, Object o1) { + logger.error(marker, string, o, o1); + } + + @Override + public void error(Marker marker, String string, Object o) { + logger.error(marker, string, o); + } + + @Override + public void error(Marker marker, String string) { + logger.error(marker, string); + } + + @Override + public void error(String string, Throwable thrwbl) { + if (logger.isDebugEnabled()) { + logger.error(string, thrwbl); + } else { + logger.error(string); + } + } + + @Override + public void error(String string, Object[] os) { + logger.error(string, os); + } + + @Override + public void error(String string, Object o, Object o1) { + logger.error(string, o, o1); + } + + @Override + public void error(String string, Object o) { + logger.error(string, o); + } + + @Override + public void error(String string) { + logger.error(string); + } + + @Override + public void debug(Marker marker, String string, Throwable thrwbl) { + logger.debug(marker, string, thrwbl); + } + + @Override + public void debug(Marker marker, String string, Object[] os) { + logger.debug(marker, string, os); + } + + @Override + public void debug(Marker marker, String string, Object o, Object o1) { + logger.debug(marker, string, o, o1); + } + + @Override + public void debug(Marker marker, String string, Object o) { + logger.debug(marker, string, o); + } + + @Override + public void debug(Marker marker, String string) { + logger.debug(marker, string); + } + + @Override + public void debug(String string, Throwable thrwbl) { + logger.debug(string, thrwbl); + } + + @Override + public void debug(String string, Object[] os) { + logger.debug(string, os); + } + + @Override + public void debug(String string, Object o, Object o1) { + logger.debug(string, o, o1); + } + + @Override + public void debug(String string, Object o) { + logger.debug(string, o); + } + + @Override + public void debug(String string) { + logger.debug(string); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-parent/pom.xml ---------------------------------------------------------------------- diff --git a/commons/nifi-parent/pom.xml b/commons/nifi-parent/pom.xml new file mode 100644 index 0000000..7684d53 --- /dev/null +++ b/commons/nifi-parent/pom.xml @@ -0,0 +1,217 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-parent</artifactId> + <version>0.0.1-SNAPSHOT</version> + <packaging>pom</packaging> + <name>NiFi Parent</name> + + <description>A helpful parent pom which can be used for all NiFi components. Helps establish the basic requirements/depdencies.</description> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <org.slf4j.version>1.7.7</org.slf4j.version> + </properties> + + <build> + <pluginManagement> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>findbugs-maven-plugin</artifactId> + <version>3.0.0</version> + <configuration> + <effort>Max</effort> + <threshold>Medium</threshold> + <xmlOutput>true</xmlOutput> + </configuration> + <executions> + <execution> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </pluginManagement> + <plugins> + <plugin> + <groupId>org.apache.nifi</groupId> + <artifactId>nar-maven-plugin</artifactId> + <version>0.0.1-SNAPSHOT</version> + <extensions>true</extensions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.7</source> + <target>1.7</target> + <fork>true</fork> + </configuration> + <version>3.2</version> + </plugin> + <plugin> + <artifactId>maven-jar-plugin</artifactId> + <version>2.5</version> + </plugin> + <plugin> + <artifactId>maven-war-plugin</artifactId> + <version>2.5</version> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <version>2.9</version> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-resources-plugin</artifactId> + <version>2.7</version> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.18</version> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-assembly-plugin</artifactId> + <version>2.5.2</version> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-release-plugin</artifactId> + <version>2.5.1</version> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>jaxb2-maven-plugin</artifactId> + <version>1.6</version> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-source-plugin</artifactId> + <version>2.4</version> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <version>1.3.2</version> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-site-plugin</artifactId> + <version>3.4</version> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <version>2.10.1</version> + <configuration> + <failOnError>false</failOnError> + <quiet>true</quiet> + <show>private</show> + </configuration> + </plugin> + </plugins> + </build> + <dependencyManagement> + <dependencies> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + <version>1.1.2</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>jcl-over-slf4j</artifactId> + <version>${org.slf4j.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>jul-to-slf4j</artifactId> + <version>${org.slf4j.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>${org.slf4j.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> + </dependencyManagement> + <dependencies> + <!-- required for libraries using commons-loggings --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.11</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <version>1.10.8</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <version>${org.slf4j.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + <!-- All projects use the same distrubution Manager for publishing artifacts + but for obtaining them this is specified in the settings.xml file for each + user --> + <distributionManagement> + <repository> + <id>nifi-releases</id> + <url>${nifi.repo.url}</url> + </repository> + <snapshotRepository> + <id>nifi-snapshots</id> + <url>${nifi.snapshot.repo.url}</url> + </snapshotRepository> + </distributionManagement> + <reporting> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>findbugs-maven-plugin</artifactId> + <version>3.0.0</version> + <configuration> + <effort>Max</effort> + <threshold>Medium</threshold> + <xmlOutput>true</xmlOutput> + </configuration> + </plugin> + </plugins> + </reporting> +</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-properties/.gitignore ---------------------------------------------------------------------- diff --git a/commons/nifi-properties/.gitignore b/commons/nifi-properties/.gitignore new file mode 100755 index 0000000..073c9fa --- /dev/null +++ b/commons/nifi-properties/.gitignore @@ -0,0 +1,3 @@ +/target +/target +/target http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-properties/pom.xml ---------------------------------------------------------------------- diff --git a/commons/nifi-properties/pom.xml b/commons/nifi-properties/pom.xml new file mode 100644 index 0000000..70f90aa --- /dev/null +++ b/commons/nifi-properties/pom.xml @@ -0,0 +1,29 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-parent</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + <artifactId>nifi-properties</artifactId> + <version>0.0.1-SNAPSHOT</version> + <name>NiFi Properties</name> + + <dependencies> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java ---------------------------------------------------------------------- diff --git a/commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java new file mode 100644 index 0000000..1520814 --- /dev/null +++ b/commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -0,0 +1,882 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.nio.file.InvalidPathException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +public class NiFiProperties extends Properties { + + private static final long serialVersionUID = 2119177359005492702L; + + private static final Logger LOG = LoggerFactory.getLogger(NiFiProperties.class); + private static NiFiProperties instance = null; + + // core properties + public static final String PROPERTIES_FILE_PATH = "nifi.properties.file.path"; + public static final String FLOW_CONFIGURATION_FILE = "nifi.flow.configuration.file"; + public static final String FLOW_CONFIGURATION_ARCHIVE_FILE = "nifi.flow.configuration.archive.file"; + public static final String TASK_CONFIGURATION_FILE = "nifi.reporting.task.configuration.file"; + public static final String SERVICE_CONFIGURATION_FILE = "nifi.controller.service.configuration.file"; + public static final String AUTHORITY_PROVIDER_CONFIGURATION_FILE = "nifi.authority.provider.configuration.file"; + public static final String REPOSITORY_DATABASE_DIRECTORY = "nifi.database.directory"; + public static final String RESTORE_DIRECTORY = "nifi.restore.directory"; + public static final String VERSION = "nifi.version"; + public static final String WRITE_DELAY_INTERVAL = "nifi.flowservice.writedelay.interval"; + public static final String AUTO_RESUME_STATE = "nifi.flowcontroller.autoResumeState"; + public static final String FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD = "nifi.flowcontroller.graceful.shutdown.period"; + public static final String NAR_LIBRARY_DIRECTORY = "nifi.nar.library.directory"; + public static final String NAR_WORKING_DIRECTORY = "nifi.nar.working.directory"; + public static final String COMPONENT_DOCS_DIRECTORY = "nifi.documentation.working.directory"; + public static final String SENSITIVE_PROPS_KEY = "nifi.sensitive.props.key"; + public static final String SENSITIVE_PROPS_ALGORITHM = "nifi.sensitive.props.algorithm"; + public static final String SENSITIVE_PROPS_PROVIDER = "nifi.sensitive.props.provider"; + public static final String H2_URL_APPEND = "nifi.h2.url.append"; + public static final String REMOTE_INPUT_PORT = "nifi.remote.input.socket.port"; + public static final String SITE_TO_SITE_SECURE = "nifi.remote.input.secure"; + public static final String TEMPLATE_DIRECTORY = "nifi.templates.directory"; + public static final String ADMINISTRATIVE_YIELD_DURATION = "nifi.administrative.yield.duration"; + public static final String PERSISTENT_STATE_DIRECTORY = "nifi.persistent.state.directory"; + + // content repository properties + public static final String REPOSITORY_CONTENT_PREFIX = "nifi.content.repository.directory."; + public static final String CONTENT_REPOSITORY_IMPLEMENTATION = "nifi.content.repository.implementation"; + public static final String MAX_APPENDABLE_CLAIM_SIZE = "nifi.content.claim.max.appendable.size"; + public static final String MAX_FLOWFILES_PER_CLAIM = "nifi.content.claim.max.flow.files"; + public static final String CONTENT_ARCHIVE_MAX_RETENTION_PERIOD = "nifi.content.repository.archive.max.retention.period"; + public static final String CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE = "nifi.content.repository.archive.max.usage.percentage"; + public static final String CONTENT_ARCHIVE_BACK_PRESSURE_PERCENTAGE = "nifi.content.repository.archive.backpressure.percentage"; + public static final String CONTENT_ARCHIVE_ENABLED = "nifi.content.repository.archive.enabled"; + public static final String CONTENT_ARCHIVE_CLEANUP_FREQUENCY = "nifi.content.repository.archive.cleanup.frequency"; + public static final String CONTENT_VIEWER_URL = "nifi.content.viewer.url"; + + // flowfile repository properties + public static final String FLOWFILE_REPOSITORY_IMPLEMENTATION = "nifi.flowfile.repository.implementation"; + public static final String FLOWFILE_REPOSITORY_ALWAYS_SYNC = "nifi.flowfile.repository.always.sync"; + public static final String FLOWFILE_REPOSITORY_DIRECTORY = "nifi.flowfile.repository.directory"; + public static final String FLOWFILE_REPOSITORY_PARTITIONS = "nifi.flowfile.repository.partitions"; + public static final String FLOWFILE_REPOSITORY_CHECKPOINT_INTERVAL = "nifi.flowfile.repository.checkpoint.interval"; + public static final String FLOWFILE_SWAP_MANAGER_IMPLEMENTATION = "nifi.swap.manager.implementation"; + public static final String QUEUE_SWAP_THRESHOLD = "nifi.queue.swap.threshold"; + public static final String SWAP_STORAGE_LOCATION = "nifi.swap.storage.directory"; + public static final String SWAP_IN_THREADS = "nifi.swap.in.threads"; + public static final String SWAP_IN_PERIOD = "nifi.swap.in.period"; + public static final String SWAP_OUT_THREADS = "nifi.swap.out.threads"; + public static final String SWAP_OUT_PERIOD = "nifi.swap.out.period"; + + // provenance properties + public static final String PROVENANCE_REPO_IMPLEMENTATION_CLASS = "nifi.provenance.repository.implementation"; + public static final String PROVENANCE_REPO_DIRECTORY_PREFIX = "nifi.provenance.repository.directory."; + public static final String PROVENANCE_MAX_STORAGE_TIME = "nifi.provenance.repository.max.storage.time"; + public static final String PROVENANCE_MAX_STORAGE_SIZE = "nifi.provenance.repository.max.storage.size"; + public static final String PROVENANCE_ROLLOVER_TIME = "nifi.provenance.repository.rollover.time"; + public static final String PROVENANCE_ROLLOVER_SIZE = "nifi.provenance.repository.rollover.size"; + public static final String PROVENANCE_QUERY_THREAD_POOL_SIZE = "nifi.provenance.repository.query.threads"; + public static final String PROVENANCE_COMPRESS_ON_ROLLOVER = "nifi.provenance.repository.compress.on.rollover"; + public static final String PROVENANCE_INDEXED_FIELDS = "nifi.provenance.repository.indexed.fields"; + public static final String PROVENANCE_INDEXED_ATTRIBUTES = "nifi.provenance.repository.indexed.attributes"; + public static final String PROVENANCE_INDEX_SHARD_SIZE = "nifi.provenance.repository.index.shard.size"; + public static final String PROVENANCE_JOURNAL_COUNT = "nifi.provenance.repository.journal.count"; + + // component status repository properties + public static final String COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION = "nifi.components.status.repository.implementation"; + public static final String COMPONENT_STATUS_SNAPSHOT_FREQUENCY = "nifi.components.status.snapshot.frequency"; + + // encryptor properties + public static final String NF_SENSITIVE_PROPS_KEY = "nifi.sensitive.props.key"; + public static final String NF_SENSITIVE_PROPS_ALGORITHM = "nifi.sensitive.props.algorithm"; + public static final String NF_SENSITIVE_PROPS_PROVIDER = "nifi.sensitive.props.provider"; + + // security properties + public static final String SECURITY_KEYSTORE = "nifi.security.keystore"; + public static final String SECURITY_KEYSTORE_TYPE = "nifi.security.keystoreType"; + public static final String SECURITY_KEYSTORE_PASSWD = "nifi.security.keystorePasswd"; + public static final String SECURITY_KEY_PASSWD = "nifi.security.keyPasswd"; + public static final String SECURITY_TRUSTSTORE = "nifi.security.truststore"; + public static final String SECURITY_TRUSTSTORE_TYPE = "nifi.security.truststoreType"; + public static final String SECURITY_TRUSTSTORE_PASSWD = "nifi.security.truststorePasswd"; + public static final String SECURITY_NEED_CLIENT_AUTH = "nifi.security.needClientAuth"; + public static final String SECURITY_USER_AUTHORITY_PROVIDER = "nifi.security.user.authority.provider"; + public static final String SECURITY_CLUSTER_AUTHORITY_PROVIDER_PORT = "nifi.security.cluster.authority.provider.port"; + public static final String SECURITY_CLUSTER_AUTHORITY_PROVIDER_THREADS = "nifi.security.cluster.authority.provider.threads"; + public static final String SECURITY_USER_CREDENTIAL_CACHE_DURATION = "nifi.security.user.credential.cache.duration"; + public static final String SECURITY_SUPPORT_NEW_ACCOUNT_REQUESTS = "nifi.security.support.new.account.requests"; + public static final String SECURITY_DEFAULT_USER_ROLES = "nifi.security.default.user.roles"; + public static final String SECURITY_OCSP_RESPONDER_URL = "nifi.security.ocsp.responder.url"; + public static final String SECURITY_OCSP_RESPONDER_CERTIFICATE = "nifi.security.ocsp.responder.certificate"; + + // web properties + public static final String WEB_WAR_DIR = "nifi.web.war.directory"; + public static final String WEB_HTTP_PORT = "nifi.web.http.port"; + public static final String WEB_HTTP_HOST = "nifi.web.http.host"; + public static final String WEB_HTTPS_PORT = "nifi.web.https.port"; + public static final String WEB_HTTPS_HOST = "nifi.web.https.host"; + public static final String WEB_WORKING_DIR = "nifi.web.jetty.working.directory"; + + // ui properties + public static final String UI_BANNER_TEXT = "nifi.ui.banner.text"; + public static final String UI_AUTO_REFRESH_INTERVAL = "nifi.ui.autorefresh.interval"; + + // cluster common properties + public static final String CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL = "nifi.cluster.protocol.heartbeat.interval"; + public static final String CLUSTER_PROTOCOL_IS_SECURE = "nifi.cluster.protocol.is.secure"; + public static final String CLUSTER_PROTOCOL_SOCKET_TIMEOUT = "nifi.cluster.protocol.socket.timeout"; + public static final String CLUSTER_PROTOCOL_CONNECTION_HANDSHAKE_TIMEOUT = "nifi.cluster.protocol.connection.handshake.timeout"; + public static final String CLUSTER_PROTOCOL_USE_MULTICAST = "nifi.cluster.protocol.use.multicast"; + public static final String CLUSTER_PROTOCOL_MULTICAST_ADDRESS = "nifi.cluster.protocol.multicast.address"; + public static final String CLUSTER_PROTOCOL_MULTICAST_PORT = "nifi.cluster.protocol.multicast.port"; + public static final String CLUSTER_PROTOCOL_MULTICAST_SERVICE_BROADCAST_DELAY = "nifi.cluster.protocol.multicast.service.broadcast.delay"; + public static final String CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS = "nifi.cluster.protocol.multicast.service.locator.attempts"; + public static final String CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS_DELAY = "nifi.cluster.protocol.multicast.service.locator.attempts.delay"; + + // cluster node properties + public static final String CLUSTER_IS_NODE = "nifi.cluster.is.node"; + public static final String CLUSTER_NODE_ADDRESS = "nifi.cluster.node.address"; + public static final String CLUSTER_NODE_PROTOCOL_PORT = "nifi.cluster.node.protocol.port"; + public static final String CLUSTER_NODE_PROTOCOL_THREADS = "nifi.cluster.node.protocol.threads"; + public static final String CLUSTER_NODE_UNICAST_MANAGER_ADDRESS = "nifi.cluster.node.unicast.manager.address"; + public static final String CLUSTER_NODE_UNICAST_MANAGER_PROTOCOL_PORT = "nifi.cluster.node.unicast.manager.protocol.port"; + + // cluster manager properties + public static final String CLUSTER_IS_MANAGER = "nifi.cluster.is.manager"; + public static final String CLUSTER_MANAGER_ADDRESS = "nifi.cluster.manager.address"; + public static final String CLUSTER_MANAGER_PROTOCOL_PORT = "nifi.cluster.manager.protocol.port"; + public static final String CLUSTER_MANAGER_NODE_FIREWALL_FILE = "nifi.cluster.manager.node.firewall.file"; + public static final String CLUSTER_MANAGER_NODE_EVENT_HISTORY_SIZE = "nifi.cluster.manager.node.event.history.size"; + public static final String CLUSTER_MANAGER_NODE_API_CONNECTION_TIMEOUT = "nifi.cluster.manager.node.api.connection.timeout"; + public static final String CLUSTER_MANAGER_NODE_API_READ_TIMEOUT = "nifi.cluster.manager.node.api.read.timeout"; + public static final String CLUSTER_MANAGER_NODE_API_REQUEST_THREADS = "nifi.cluster.manager.node.api.request.threads"; + public static final String CLUSTER_MANAGER_FLOW_RETRIEVAL_DELAY = "nifi.cluster.manager.flow.retrieval.delay"; + public static final String CLUSTER_MANAGER_PROTOCOL_THREADS = "nifi.cluster.manager.protocol.threads"; + public static final String CLUSTER_MANAGER_SAFEMODE_DURATION = "nifi.cluster.manager.safemode.duration"; + + // defaults + public static final String DEFAULT_TITLE = "NiFi"; + public static final Boolean DEFAULT_AUTO_RESUME_STATE = true; + public static final String DEFAULT_AUTHORITY_PROVIDER_CONFIGURATION_FILE = "conf/authority-providers.xml"; + public static final String DEFAULT_USER_CREDENTIAL_CACHE_DURATION = "24 hours"; + public static final Integer DEFAULT_REMOTE_INPUT_PORT = null; + public static final Path DEFAULT_TEMPLATE_DIRECTORY = Paths.get("conf", "templates"); + public static final String DEFAULT_WEB_WORKING_DIR = "./work/jetty"; + public static final String DEFAULT_NAR_WORKING_DIR = "./work/nar"; + public static final String DEFAULT_COMPONENT_DOCS_DIRECTORY = "./work/docs/components"; + public static final String DEFAULT_NAR_LIBRARY_DIR = "./lib"; + public static final String DEFAULT_FLOWFILE_REPO_PARTITIONS = "256"; + public static final String DEFAULT_FLOWFILE_CHECKPOINT_INTERVAL = "2 min"; + public static final int DEFAULT_MAX_FLOWFILES_PER_CLAIM = 100; + public static final int DEFAULT_QUEUE_SWAP_THRESHOLD = 20000; + public static final String DEFAULT_SWAP_STORAGE_LOCATION = "./flowfile_repository/swap"; + public static final String DEFAULT_SWAP_IN_PERIOD = "1 sec"; + public static final String DEFAULT_SWAP_OUT_PERIOD = "5 sec"; + public static final int DEFAULT_SWAP_IN_THREADS = 4; + public static final int DEFAULT_SWAP_OUT_THREADS = 4; + public static final String DEFAULT_ADMINISTRATIVE_YIELD_DURATION = "30 sec"; + public static final String DEFAULT_PERSISTENT_STATE_DIRECTORY = "./conf/state"; + public static final String DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY = "5 mins"; + + // cluster common defaults + public static final String DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL = "5 sec"; + public static final String DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_BROADCAST_DELAY = "500 ms"; + public static final int DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS = 3; + public static final String DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS_DELAY = "1 sec"; + public static final String DEFAULT_CLUSTER_PROTOCOL_SOCKET_TIMEOUT = "30 sec"; + public static final String DEFAULT_CLUSTER_PROTOCOL_CONNECTION_HANDSHAKE_TIMEOUT = "45 sec"; + + // cluster node defaults + public static final int DEFAULT_CLUSTER_NODE_PROTOCOL_THREADS = 2; + + // cluster manager defaults + public static final int DEFAULT_CLUSTER_MANAGER_NODE_EVENT_HISTORY_SIZE = 10; + public static final String DEFAULT_CLUSTER_MANAGER_NODE_API_CONNECTION_TIMEOUT = "30 sec"; + public static final String DEFAULT_CLUSTER_MANAGER_NODE_API_READ_TIMEOUT = "30 sec"; + public static final int DEFAULT_CLUSTER_MANAGER_NODE_API_NUM_REQUEST_THREADS = 10; + public static final String DEFAULT_CLUSTER_MANAGER_FLOW_RETRIEVAL_DELAY = "5 sec"; + public static final int DEFAULT_CLUSTER_MANAGER_PROTOCOL_THREADS = 10; + public static final String DEFAULT_CLUSTER_MANAGER_SAFEMODE_DURATION = "0 sec"; + + private NiFiProperties() { + super(); + } + + /** + * This is the method through which the NiFiProperties object should be + * obtained. + * + * @return the NiFiProperties object to use + * @throws RuntimeException if unable to load properties file + */ + public static synchronized NiFiProperties getInstance() { + if (null == instance) { + final NiFiProperties suspectInstance = new NiFiProperties(); + final String nfPropertiesFilePath = System.getProperty(NiFiProperties.PROPERTIES_FILE_PATH); + if (null == nfPropertiesFilePath || nfPropertiesFilePath.trim().length() == 0) { + throw new RuntimeException("Requires a system property called \'" + NiFiProperties.PROPERTIES_FILE_PATH + "\' and this is not set or has no value"); + } + final File propertiesFile = new File(nfPropertiesFilePath); + if (!propertiesFile.exists()) { + throw new RuntimeException("Properties file doesn't exist \'" + propertiesFile.getAbsolutePath() + "\'"); + } + if (!propertiesFile.canRead()) { + throw new RuntimeException("Properties file exists but cannot be read \'" + propertiesFile.getAbsolutePath() + "\'"); + } + InputStream inStream = null; + try { + inStream = new BufferedInputStream(new FileInputStream(propertiesFile)); + suspectInstance.load(inStream); + } catch (final Exception ex) { + LOG.error("Cannot load properties file due to " + ex.getLocalizedMessage()); + throw new RuntimeException("Cannot load properties file due to " + ex.getLocalizedMessage(), ex); + } finally { + if (null != inStream) { + try { + inStream.close(); + } catch (final Exception ex) { + /** + * do nothing * + */ + } + } + } + instance = suspectInstance; + } + return instance; + } + + // getters for core properties // + public File getFlowConfigurationFile() { + try { + return new File(getProperty(FLOW_CONFIGURATION_FILE)); + } catch (Exception ex) { + return null; + } + } + + public File getFlowConfigurationFileDir() { + try { + return getFlowConfigurationFile().getParentFile(); + } catch (Exception ex) { + return null; + } + } + + private Integer getPropertyAsPort(final String propertyName, final Integer defaultValue) { + final String port = getProperty(propertyName); + if (StringUtils.isEmpty(port)) { + return defaultValue; + } + try { + final int val = Integer.parseInt(port); + if (val <= 0 || val > 65535) { + throw new RuntimeException("Valid port range is 0 - 65535 but got " + val); + } + return val; + } catch (final NumberFormatException e) { + return defaultValue; + } + } + + public int getQueueSwapThreshold() { + final String thresholdValue = getProperty(QUEUE_SWAP_THRESHOLD); + if (thresholdValue == null) { + return DEFAULT_QUEUE_SWAP_THRESHOLD; + } + + try { + return Integer.parseInt(thresholdValue); + } catch (final NumberFormatException e) { + return DEFAULT_QUEUE_SWAP_THRESHOLD; + } + } + + public File getSwapStorageLocation() { + final String location = getProperty(SWAP_STORAGE_LOCATION); + if (location == null) { + return new File(DEFAULT_SWAP_STORAGE_LOCATION); + } else { + return new File(location); + } + } + + public Integer getIntegerProperty(final String propertyName, final Integer defaultValue) { + final String value = getProperty(propertyName); + if (value == null) { + return defaultValue; + } + + try { + return Integer.parseInt(getProperty(propertyName)); + } catch (final Exception e) { + return defaultValue; + } + } + + public int getSwapInThreads() { + return getIntegerProperty(SWAP_IN_THREADS, DEFAULT_SWAP_IN_THREADS); + } + + public int getSwapOutThreads() { + final String value = getProperty(SWAP_OUT_THREADS); + if (value == null) { + return DEFAULT_SWAP_OUT_THREADS; + } + + try { + return Integer.parseInt(getProperty(SWAP_OUT_THREADS)); + } catch (final Exception e) { + return DEFAULT_SWAP_OUT_THREADS; + } + } + + public String getSwapInPeriod() { + return getProperty(SWAP_IN_PERIOD, DEFAULT_SWAP_IN_PERIOD); + } + + public String getSwapOutPeriod() { + return getProperty(SWAP_OUT_PERIOD, DEFAULT_SWAP_OUT_PERIOD); + } + + public String getAdministrativeYieldDuration() { + return getProperty(ADMINISTRATIVE_YIELD_DURATION, DEFAULT_ADMINISTRATIVE_YIELD_DURATION); + } + + /** + * The socket port to listen on for a Remote Input Port. + * + * @return + */ + public Integer getRemoteInputPort() { + return getPropertyAsPort(REMOTE_INPUT_PORT, DEFAULT_REMOTE_INPUT_PORT); + } + + public Boolean isSiteToSiteSecure() { + final String secureVal = getProperty(SITE_TO_SITE_SECURE); + if (secureVal == null) { + return null; + } + + if ("true".equalsIgnoreCase(secureVal)) { + return true; + } + if ("false".equalsIgnoreCase(secureVal)) { + return false; + } + + throw new IllegalStateException("Property value for " + SITE_TO_SITE_SECURE + " is " + secureVal + "; expected 'true' or 'false'"); + } + + /** + * Returns the directory to which Templates are to be persisted + * + * @return + */ + public Path getTemplateDirectory() { + final String strVal = getProperty(TEMPLATE_DIRECTORY); + return (strVal == null) ? DEFAULT_TEMPLATE_DIRECTORY : Paths.get(strVal); + } + + /** + * Get the flow service write delay. + * + * @return The write delay + */ + public String getFlowServiceWriteDelay() { + return getProperty(WRITE_DELAY_INTERVAL); + } + + /** + * Returns whether the processors should be started automatically when the + * application loads. + * + * @return Whether to auto start the processors or not + */ + public boolean getAutoResumeState() { + final String rawAutoResumeState = getProperty(AUTO_RESUME_STATE, DEFAULT_AUTO_RESUME_STATE.toString()); + return Boolean.parseBoolean(rawAutoResumeState); + } + + /** + * Returns the number of partitions that should be used for the FlowFile + * Repository + * + * @return + */ + public int getFlowFileRepositoryPartitions() { + final String rawProperty = getProperty(FLOWFILE_REPOSITORY_PARTITIONS, DEFAULT_FLOWFILE_REPO_PARTITIONS); + return Integer.parseInt(rawProperty); + } + + /** + * Returns the number of milliseconds between FlowFileRepository + * checkpointing + * + * @return + */ + public String getFlowFileRepositoryCheckpointInterval() { + return getProperty(FLOWFILE_REPOSITORY_CHECKPOINT_INTERVAL, DEFAULT_FLOWFILE_CHECKPOINT_INTERVAL); + } + + /** + * @return the restore directory or null if not configured + */ + public File getRestoreDirectory() { + final String value = getProperty(RESTORE_DIRECTORY); + if (StringUtils.isBlank(value)) { + return null; + } else { + return new File(value); + } + } + + /** + * @return the user authorities file + */ + public File getAuthorityProviderConfiguraitonFile() { + final String value = getProperty(AUTHORITY_PROVIDER_CONFIGURATION_FILE); + if (StringUtils.isBlank(value)) { + return new File(DEFAULT_AUTHORITY_PROVIDER_CONFIGURATION_FILE); + } else { + return new File(value); + } + } + + /** + * Will default to true unless the value is explicitly set to false. + * + * @return Whether client auth is required + */ + public boolean getNeedClientAuth() { + boolean needClientAuth = true; + String rawNeedClientAuth = getProperty(SECURITY_NEED_CLIENT_AUTH); + if ("false".equalsIgnoreCase(rawNeedClientAuth)) { + needClientAuth = false; + } + return needClientAuth; + } + + public String getUserCredentialCacheDuration() { + return getProperty(SECURITY_USER_CREDENTIAL_CACHE_DURATION, DEFAULT_USER_CREDENTIAL_CACHE_DURATION); + } + + public boolean getSupportNewAccountRequests() { + boolean shouldSupport = true; + String rawShouldSupport = getProperty(SECURITY_SUPPORT_NEW_ACCOUNT_REQUESTS); + if ("false".equalsIgnoreCase(rawShouldSupport)) { + shouldSupport = false; + } + return shouldSupport; + } + + // getters for web properties // + public Integer getPort() { + Integer port = null; + try { + port = Integer.parseInt(getProperty(WEB_HTTP_PORT)); + } catch (NumberFormatException nfe) { + } + return port; + } + + public Integer getSslPort() { + Integer sslPort = null; + try { + sslPort = Integer.parseInt(getProperty(WEB_HTTPS_PORT)); + } catch (NumberFormatException nfe) { + } + return sslPort; + } + + public File getWebWorkingDirectory() { + return new File(getProperty(WEB_WORKING_DIR, DEFAULT_WEB_WORKING_DIR)); + } + + public File getComponentDocumentationWorkingDirectory() { + return new File(getProperty(COMPONENT_DOCS_DIRECTORY, DEFAULT_COMPONENT_DOCS_DIRECTORY)); + } + + public File getNarWorkingDirectory() { + return new File(getProperty(NAR_WORKING_DIRECTORY, DEFAULT_NAR_WORKING_DIR)); + } + + public File getFrameworkWorkingDirectory() { + return new File(getNarWorkingDirectory(), "framework"); + } + + public File getExtensionsWorkingDirectory() { + return new File(getNarWorkingDirectory(), "extensions"); + } + + public File getNarLibraryDirectory() { + return new File(getProperty(NAR_LIBRARY_DIRECTORY, DEFAULT_NAR_LIBRARY_DIR)); + } + + // getters for ui properties // + /** + * Get the title for the UI. + * + * @return The UI title + */ + public String getUiTitle() { + return this.getProperty(VERSION, DEFAULT_TITLE); + } + + /** + * Get the banner text. + * + * @return The banner text + */ + public String getBannerText() { + return this.getProperty(UI_BANNER_TEXT, StringUtils.EMPTY); + } + + /** + * Returns the auto refresh interval in seconds. + * + * @return + */ + public String getAutoRefreshInterval() { + return getProperty(UI_AUTO_REFRESH_INTERVAL); + } + + // getters for cluster protocol properties // + public String getClusterProtocolHeartbeatInterval() { + return getProperty(CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL, DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL); + } + + public String getNodeHeartbeatInterval() { + return getClusterProtocolHeartbeatInterval(); + } + + public String getClusterProtocolSocketTimeout() { + return getProperty(CLUSTER_PROTOCOL_SOCKET_TIMEOUT, DEFAULT_CLUSTER_PROTOCOL_SOCKET_TIMEOUT); + } + + public String getClusterProtocolConnectionHandshakeTimeout() { + return getProperty(CLUSTER_PROTOCOL_CONNECTION_HANDSHAKE_TIMEOUT, DEFAULT_CLUSTER_PROTOCOL_CONNECTION_HANDSHAKE_TIMEOUT); + } + + public boolean getClusterProtocolUseMulticast() { + return Boolean.parseBoolean(getProperty(CLUSTER_PROTOCOL_USE_MULTICAST)); + } + + public InetSocketAddress getClusterProtocolMulticastAddress() { + try { + String multicastAddress = getProperty(CLUSTER_PROTOCOL_MULTICAST_ADDRESS); + int multicastPort = Integer.parseInt(getProperty(CLUSTER_PROTOCOL_MULTICAST_PORT)); + return new InetSocketAddress(multicastAddress, multicastPort); + } catch (Exception ex) { + throw new RuntimeException("Invalid multicast address/port due to: " + ex, ex); + } + } + + public String getClusterProtocolMulticastServiceBroadcastDelay() { + return getProperty(CLUSTER_PROTOCOL_MULTICAST_SERVICE_BROADCAST_DELAY); + } + + public File getPersistentStateDirectory() { + final String dirName = getProperty(PERSISTENT_STATE_DIRECTORY, DEFAULT_PERSISTENT_STATE_DIRECTORY); + final File file = new File(dirName); + if (!file.exists()) { + file.mkdirs(); + } + return file; + } + + public int getClusterProtocolMulticastServiceLocatorAttempts() { + try { + return Integer.parseInt(getProperty(CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS)); + } catch (NumberFormatException nfe) { + return DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS; + } + } + + public String getClusterProtocolMulticastServiceLocatorAttemptsDelay() { + return getProperty(CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS_DELAY, DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS_DELAY); + } + + // getters for cluster node properties // + public boolean isNode() { + return Boolean.parseBoolean(getProperty(CLUSTER_IS_NODE)); + } + + public InetSocketAddress getClusterNodeProtocolAddress() { + try { + String socketAddress = getProperty(CLUSTER_NODE_ADDRESS); + if (StringUtils.isBlank(socketAddress)) { + socketAddress = "localhost"; + } + int socketPort = getClusterNodeProtocolPort(); + return InetSocketAddress.createUnresolved(socketAddress, socketPort); + } catch (Exception ex) { + throw new RuntimeException("Invalid node protocol address/port due to: " + ex, ex); + } + } + + public Integer getClusterNodeProtocolPort() { + try { + return Integer.parseInt(getProperty(CLUSTER_NODE_PROTOCOL_PORT)); + } catch (NumberFormatException nfe) { + return null; + } + } + + public int getClusterNodeProtocolThreads() { + try { + return Integer.parseInt(getProperty(CLUSTER_NODE_PROTOCOL_THREADS)); + } catch (NumberFormatException nfe) { + return DEFAULT_CLUSTER_NODE_PROTOCOL_THREADS; + } + } + + public InetSocketAddress getClusterNodeUnicastManagerProtocolAddress() { + try { + String socketAddress = getProperty(CLUSTER_NODE_UNICAST_MANAGER_ADDRESS); + if (StringUtils.isBlank(socketAddress)) { + socketAddress = "localhost"; + } + int socketPort = Integer.parseInt(getProperty(CLUSTER_NODE_UNICAST_MANAGER_PROTOCOL_PORT)); + return InetSocketAddress.createUnresolved(socketAddress, socketPort); + } catch (Exception ex) { + throw new RuntimeException("Invalid unicast manager address/port due to: " + ex, ex); + } + } + + // getters for cluster manager properties // + public boolean isClusterManager() { + return Boolean.parseBoolean(getProperty(CLUSTER_IS_MANAGER)); + } + + public InetSocketAddress getClusterManagerProtocolAddress() { + try { + String socketAddress = getProperty(CLUSTER_MANAGER_ADDRESS); + if (StringUtils.isBlank(socketAddress)) { + socketAddress = "localhost"; + } + int socketPort = getClusterManagerProtocolPort(); + return InetSocketAddress.createUnresolved(socketAddress, socketPort); + } catch (Exception ex) { + throw new RuntimeException("Invalid manager protocol address/port due to: " + ex, ex); + } + } + + public Integer getClusterManagerProtocolPort() { + try { + return Integer.parseInt(getProperty(CLUSTER_MANAGER_PROTOCOL_PORT)); + } catch (NumberFormatException nfe) { + return null; + } + } + + public File getClusterManagerNodeFirewallFile() { + final String firewallFile = getProperty(CLUSTER_MANAGER_NODE_FIREWALL_FILE); + if (StringUtils.isBlank(firewallFile)) { + return null; + } else { + return new File(firewallFile); + } + } + + public int getClusterManagerNodeEventHistorySize() { + try { + return Integer.parseInt(getProperty(CLUSTER_MANAGER_NODE_EVENT_HISTORY_SIZE)); + } catch (NumberFormatException nfe) { + return DEFAULT_CLUSTER_MANAGER_NODE_EVENT_HISTORY_SIZE; + } + } + + public String getClusterManagerNodeApiConnectionTimeout() { + return getProperty(CLUSTER_MANAGER_NODE_API_CONNECTION_TIMEOUT, DEFAULT_CLUSTER_MANAGER_NODE_API_CONNECTION_TIMEOUT); + } + + public String getClusterManagerNodeApiReadTimeout() { + return getProperty(CLUSTER_MANAGER_NODE_API_READ_TIMEOUT, DEFAULT_CLUSTER_MANAGER_NODE_API_READ_TIMEOUT); + } + + public int getClusterManagerNodeApiRequestThreads() { + try { + return Integer.parseInt(getProperty(CLUSTER_MANAGER_NODE_API_REQUEST_THREADS)); + } catch (NumberFormatException nfe) { + return DEFAULT_CLUSTER_MANAGER_NODE_API_NUM_REQUEST_THREADS; + } + } + + public String getClusterManagerFlowRetrievalDelay() { + return getProperty(CLUSTER_MANAGER_FLOW_RETRIEVAL_DELAY, DEFAULT_CLUSTER_MANAGER_FLOW_RETRIEVAL_DELAY); + } + + public int getClusterManagerProtocolThreads() { + try { + return Integer.parseInt(getProperty(CLUSTER_MANAGER_PROTOCOL_THREADS)); + } catch (NumberFormatException nfe) { + return DEFAULT_CLUSTER_MANAGER_PROTOCOL_THREADS; + } + } + + public String getClusterManagerSafeModeDuration() { + return getProperty(CLUSTER_MANAGER_SAFEMODE_DURATION, DEFAULT_CLUSTER_MANAGER_SAFEMODE_DURATION); + } + + public String getClusterProtocolManagerToNodeApiScheme() { + final String isSecureProperty = getProperty(CLUSTER_PROTOCOL_IS_SECURE); + if (Boolean.valueOf(isSecureProperty)) { + return "https"; + } else { + return "http"; + } + } + + public InetSocketAddress getNodeApiAddress() { + + final String rawScheme = getClusterProtocolManagerToNodeApiScheme(); + final String scheme = (rawScheme == null) ? "http" : rawScheme; + + final String host; + final int port; + if ("http".equalsIgnoreCase(scheme)) { + // get host + if (StringUtils.isBlank(getProperty(WEB_HTTP_HOST))) { + host = "localhost"; + } else { + host = getProperty(WEB_HTTP_HOST); + } + // get port + port = getPort(); + } else { + // get host + if (StringUtils.isBlank(getProperty(WEB_HTTPS_HOST))) { + host = "localhost"; + } else { + host = getProperty(WEB_HTTPS_HOST); + } + // get port + port = getSslPort(); + } + + return InetSocketAddress.createUnresolved(host, port); + + } + + /** + * Returns the database repository path. It simply returns the value + * configured. No directories will be created as a result of this operation. + * + * @return database repository path + * @throws InvalidPathException If the configured path is invalid + */ + public Path getDatabaseRepositoryPath() { + return Paths.get(getProperty(REPOSITORY_DATABASE_DIRECTORY)); + } + + /** + * Returns the flow file repository path. It simply returns the value + * configured. No directories will be created as a result of this operation. + * + * @return database repository path + * @throws InvalidPathException If the configured path is invalid + */ + public Path getFlowFileRepositoryPath() { + return Paths.get(getProperty(FLOWFILE_REPOSITORY_DIRECTORY)); + } + + /** + * Returns the content repository paths. This method returns a mapping of + * file repository name to file repository paths. It simply returns the + * values configured. No directories will be created as a result of this + * operation. + * + * @return file repositories paths + * @throws InvalidPathException If any of the configured paths are invalid + */ + public Map<String, Path> getContentRepositoryPaths() { + final Map<String, Path> contentRepositoryPaths = new HashMap<>(); + + // go through each property + for (String propertyName : stringPropertyNames()) { + // determine if the property is a file repository path + if (StringUtils.startsWith(propertyName, REPOSITORY_CONTENT_PREFIX)) { + // get the repository key + final String key = StringUtils.substringAfter(propertyName, REPOSITORY_CONTENT_PREFIX); + + // attempt to resolve the path specified + contentRepositoryPaths.put(key, Paths.get(getProperty(propertyName))); + } + } + return contentRepositoryPaths; + } + + /** + * Returns the provenance repository paths. This method returns a mapping of + * file repository name to file repository paths. It simply returns the + * values configured. No directories will be created as a result of this + * operation. + * + * @return + */ + public Map<String, Path> getProvenanceRepositoryPaths() { + final Map<String, Path> provenanceRepositoryPaths = new HashMap<>(); + + // go through each property + for (String propertyName : stringPropertyNames()) { + // determine if the property is a file repository path + if (StringUtils.startsWith(propertyName, PROVENANCE_REPO_DIRECTORY_PREFIX)) { + // get the repository key + final String key = StringUtils.substringAfter(propertyName, PROVENANCE_REPO_DIRECTORY_PREFIX); + + // attempt to resolve the path specified + provenanceRepositoryPaths.put(key, Paths.get(getProperty(propertyName))); + } + } + return provenanceRepositoryPaths; + } + + public int getMaxFlowFilesPerClaim() { + try { + return Integer.parseInt(getProperty(MAX_FLOWFILES_PER_CLAIM)); + } catch (NumberFormatException nfe) { + return DEFAULT_MAX_FLOWFILES_PER_CLAIM; + } + } + + public String getMaxAppendableClaimSize() { + return getProperty(MAX_APPENDABLE_CLAIM_SIZE); + } + + @Override + public String getProperty(final String key, final String defaultValue) { + final String value = super.getProperty(key, defaultValue); + if (value == null) { + return null; + } + + if (value.trim().isEmpty()) { + return defaultValue; + } + return value; + } + +}