http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java ---------------------------------------------------------------------- diff --git a/commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java deleted file mode 100644 index 10e348d..0000000 --- a/commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ /dev/null @@ -1,876 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedInputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.InputStream; -import java.net.InetSocketAddress; -import java.nio.file.InvalidPathException; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; - -public class NiFiProperties extends Properties { - - private static final long serialVersionUID = 2119177359005492702L; - - private static final Logger LOG = LoggerFactory.getLogger(NiFiProperties.class); - private static NiFiProperties instance = null; - - // core properties - public static final String PROPERTIES_FILE_PATH = "nifi.properties.file.path"; - public static final String FLOW_CONFIGURATION_FILE = "nifi.flow.configuration.file"; - public static final String FLOW_CONFIGURATION_ARCHIVE_FILE = "nifi.flow.configuration.archive.file"; - public static final String TASK_CONFIGURATION_FILE = "nifi.reporting.task.configuration.file"; - public static final String SERVICE_CONFIGURATION_FILE = "nifi.controller.service.configuration.file"; - public static final String AUTHORITY_PROVIDER_CONFIGURATION_FILE = "nifi.authority.provider.configuration.file"; - public static final String REPOSITORY_DATABASE_DIRECTORY = "nifi.database.directory"; - public static final String RESTORE_DIRECTORY = "nifi.restore.directory"; - public static final String VERSION = "nifi.version"; - public static final String WRITE_DELAY_INTERVAL = "nifi.flowservice.writedelay.interval"; - public static final String AUTO_RESUME_STATE = "nifi.flowcontroller.autoResumeState"; - public static final String FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD = "nifi.flowcontroller.graceful.shutdown.period"; - public static final String NAR_LIBRARY_DIRECTORY = "nifi.nar.library.directory"; - public static final String NAR_WORKING_DIRECTORY = "nifi.nar.working.directory"; - public static final String COMPONENT_DOCS_DIRECTORY = "nifi.documentation.working.directory"; - public static final String SENSITIVE_PROPS_KEY = "nifi.sensitive.props.key"; - public static final String SENSITIVE_PROPS_ALGORITHM = "nifi.sensitive.props.algorithm"; - public static final String SENSITIVE_PROPS_PROVIDER = "nifi.sensitive.props.provider"; - public static final String H2_URL_APPEND = "nifi.h2.url.append"; - public static final String REMOTE_INPUT_PORT = "nifi.remote.input.socket.port"; - public static final String SITE_TO_SITE_SECURE = "nifi.remote.input.secure"; - public static final String TEMPLATE_DIRECTORY = "nifi.templates.directory"; - public static final String ADMINISTRATIVE_YIELD_DURATION = "nifi.administrative.yield.duration"; - public static final String PERSISTENT_STATE_DIRECTORY = "nifi.persistent.state.directory"; - - // content repository properties - public static final String REPOSITORY_CONTENT_PREFIX = "nifi.content.repository.directory."; - public static final String CONTENT_REPOSITORY_IMPLEMENTATION = "nifi.content.repository.implementation"; - public static final String MAX_APPENDABLE_CLAIM_SIZE = "nifi.content.claim.max.appendable.size"; - public static final String MAX_FLOWFILES_PER_CLAIM = "nifi.content.claim.max.flow.files"; - public static final String CONTENT_ARCHIVE_MAX_RETENTION_PERIOD = "nifi.content.repository.archive.max.retention.period"; - public static final String CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE = "nifi.content.repository.archive.max.usage.percentage"; - public static final String CONTENT_ARCHIVE_BACK_PRESSURE_PERCENTAGE = "nifi.content.repository.archive.backpressure.percentage"; - public static final String CONTENT_ARCHIVE_ENABLED = "nifi.content.repository.archive.enabled"; - public static final String CONTENT_ARCHIVE_CLEANUP_FREQUENCY = "nifi.content.repository.archive.cleanup.frequency"; - public static final String CONTENT_VIEWER_URL = "nifi.content.viewer.url"; - - // flowfile repository properties - public static final String FLOWFILE_REPOSITORY_IMPLEMENTATION = "nifi.flowfile.repository.implementation"; - public static final String FLOWFILE_REPOSITORY_ALWAYS_SYNC = "nifi.flowfile.repository.always.sync"; - public static final String FLOWFILE_REPOSITORY_DIRECTORY = "nifi.flowfile.repository.directory"; - public static final String FLOWFILE_REPOSITORY_PARTITIONS = "nifi.flowfile.repository.partitions"; - public static final String FLOWFILE_REPOSITORY_CHECKPOINT_INTERVAL = "nifi.flowfile.repository.checkpoint.interval"; - public static final String FLOWFILE_SWAP_MANAGER_IMPLEMENTATION = "nifi.swap.manager.implementation"; - public static final String QUEUE_SWAP_THRESHOLD = "nifi.queue.swap.threshold"; - public static final String SWAP_IN_THREADS = "nifi.swap.in.threads"; - public static final String SWAP_IN_PERIOD = "nifi.swap.in.period"; - public static final String SWAP_OUT_THREADS = "nifi.swap.out.threads"; - public static final String SWAP_OUT_PERIOD = "nifi.swap.out.period"; - - // provenance properties - public static final String PROVENANCE_REPO_IMPLEMENTATION_CLASS = "nifi.provenance.repository.implementation"; - public static final String PROVENANCE_REPO_DIRECTORY_PREFIX = "nifi.provenance.repository.directory."; - public static final String PROVENANCE_MAX_STORAGE_TIME = "nifi.provenance.repository.max.storage.time"; - public static final String PROVENANCE_MAX_STORAGE_SIZE = "nifi.provenance.repository.max.storage.size"; - public static final String PROVENANCE_ROLLOVER_TIME = "nifi.provenance.repository.rollover.time"; - public static final String PROVENANCE_ROLLOVER_SIZE = "nifi.provenance.repository.rollover.size"; - public static final String PROVENANCE_QUERY_THREAD_POOL_SIZE = "nifi.provenance.repository.query.threads"; - public static final String PROVENANCE_COMPRESS_ON_ROLLOVER = "nifi.provenance.repository.compress.on.rollover"; - public static final String PROVENANCE_INDEXED_FIELDS = "nifi.provenance.repository.indexed.fields"; - public static final String PROVENANCE_INDEXED_ATTRIBUTES = "nifi.provenance.repository.indexed.attributes"; - public static final String PROVENANCE_INDEX_SHARD_SIZE = "nifi.provenance.repository.index.shard.size"; - public static final String PROVENANCE_JOURNAL_COUNT = "nifi.provenance.repository.journal.count"; - - // component status repository properties - public static final String COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION = "nifi.components.status.repository.implementation"; - public static final String COMPONENT_STATUS_SNAPSHOT_FREQUENCY = "nifi.components.status.snapshot.frequency"; - - // encryptor properties - public static final String NF_SENSITIVE_PROPS_KEY = "nifi.sensitive.props.key"; - public static final String NF_SENSITIVE_PROPS_ALGORITHM = "nifi.sensitive.props.algorithm"; - public static final String NF_SENSITIVE_PROPS_PROVIDER = "nifi.sensitive.props.provider"; - - // security properties - public static final String SECURITY_KEYSTORE = "nifi.security.keystore"; - public static final String SECURITY_KEYSTORE_TYPE = "nifi.security.keystoreType"; - public static final String SECURITY_KEYSTORE_PASSWD = "nifi.security.keystorePasswd"; - public static final String SECURITY_KEY_PASSWD = "nifi.security.keyPasswd"; - public static final String SECURITY_TRUSTSTORE = "nifi.security.truststore"; - public static final String SECURITY_TRUSTSTORE_TYPE = "nifi.security.truststoreType"; - public static final String SECURITY_TRUSTSTORE_PASSWD = "nifi.security.truststorePasswd"; - public static final String SECURITY_NEED_CLIENT_AUTH = "nifi.security.needClientAuth"; - public static final String SECURITY_USER_AUTHORITY_PROVIDER = "nifi.security.user.authority.provider"; - public static final String SECURITY_CLUSTER_AUTHORITY_PROVIDER_PORT = "nifi.security.cluster.authority.provider.port"; - public static final String SECURITY_CLUSTER_AUTHORITY_PROVIDER_THREADS = "nifi.security.cluster.authority.provider.threads"; - public static final String SECURITY_USER_CREDENTIAL_CACHE_DURATION = "nifi.security.user.credential.cache.duration"; - public static final String SECURITY_SUPPORT_NEW_ACCOUNT_REQUESTS = "nifi.security.support.new.account.requests"; - public static final String SECURITY_DEFAULT_USER_ROLES = "nifi.security.default.user.roles"; - public static final String SECURITY_OCSP_RESPONDER_URL = "nifi.security.ocsp.responder.url"; - public static final String SECURITY_OCSP_RESPONDER_CERTIFICATE = "nifi.security.ocsp.responder.certificate"; - - // web properties - public static final String WEB_WAR_DIR = "nifi.web.war.directory"; - public static final String WEB_HTTP_PORT = "nifi.web.http.port"; - public static final String WEB_HTTP_HOST = "nifi.web.http.host"; - public static final String WEB_HTTPS_PORT = "nifi.web.https.port"; - public static final String WEB_HTTPS_HOST = "nifi.web.https.host"; - public static final String WEB_WORKING_DIR = "nifi.web.jetty.working.directory"; - public static final String WEB_THREADS = "nifi.web.jetty.threads"; - - // ui properties - public static final String UI_BANNER_TEXT = "nifi.ui.banner.text"; - public static final String UI_AUTO_REFRESH_INTERVAL = "nifi.ui.autorefresh.interval"; - - // cluster common properties - public static final String CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL = "nifi.cluster.protocol.heartbeat.interval"; - public static final String CLUSTER_PROTOCOL_IS_SECURE = "nifi.cluster.protocol.is.secure"; - public static final String CLUSTER_PROTOCOL_SOCKET_TIMEOUT = "nifi.cluster.protocol.socket.timeout"; - public static final String CLUSTER_PROTOCOL_CONNECTION_HANDSHAKE_TIMEOUT = "nifi.cluster.protocol.connection.handshake.timeout"; - public static final String CLUSTER_PROTOCOL_USE_MULTICAST = "nifi.cluster.protocol.use.multicast"; - public static final String CLUSTER_PROTOCOL_MULTICAST_ADDRESS = "nifi.cluster.protocol.multicast.address"; - public static final String CLUSTER_PROTOCOL_MULTICAST_PORT = "nifi.cluster.protocol.multicast.port"; - public static final String CLUSTER_PROTOCOL_MULTICAST_SERVICE_BROADCAST_DELAY = "nifi.cluster.protocol.multicast.service.broadcast.delay"; - public static final String CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS = "nifi.cluster.protocol.multicast.service.locator.attempts"; - public static final String CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS_DELAY = "nifi.cluster.protocol.multicast.service.locator.attempts.delay"; - - // cluster node properties - public static final String CLUSTER_IS_NODE = "nifi.cluster.is.node"; - public static final String CLUSTER_NODE_ADDRESS = "nifi.cluster.node.address"; - public static final String CLUSTER_NODE_PROTOCOL_PORT = "nifi.cluster.node.protocol.port"; - public static final String CLUSTER_NODE_PROTOCOL_THREADS = "nifi.cluster.node.protocol.threads"; - public static final String CLUSTER_NODE_UNICAST_MANAGER_ADDRESS = "nifi.cluster.node.unicast.manager.address"; - public static final String CLUSTER_NODE_UNICAST_MANAGER_PROTOCOL_PORT = "nifi.cluster.node.unicast.manager.protocol.port"; - - // cluster manager properties - public static final String CLUSTER_IS_MANAGER = "nifi.cluster.is.manager"; - public static final String CLUSTER_MANAGER_ADDRESS = "nifi.cluster.manager.address"; - public static final String CLUSTER_MANAGER_PROTOCOL_PORT = "nifi.cluster.manager.protocol.port"; - public static final String CLUSTER_MANAGER_NODE_FIREWALL_FILE = "nifi.cluster.manager.node.firewall.file"; - public static final String CLUSTER_MANAGER_NODE_EVENT_HISTORY_SIZE = "nifi.cluster.manager.node.event.history.size"; - public static final String CLUSTER_MANAGER_NODE_API_CONNECTION_TIMEOUT = "nifi.cluster.manager.node.api.connection.timeout"; - public static final String CLUSTER_MANAGER_NODE_API_READ_TIMEOUT = "nifi.cluster.manager.node.api.read.timeout"; - public static final String CLUSTER_MANAGER_NODE_API_REQUEST_THREADS = "nifi.cluster.manager.node.api.request.threads"; - public static final String CLUSTER_MANAGER_FLOW_RETRIEVAL_DELAY = "nifi.cluster.manager.flow.retrieval.delay"; - public static final String CLUSTER_MANAGER_PROTOCOL_THREADS = "nifi.cluster.manager.protocol.threads"; - public static final String CLUSTER_MANAGER_SAFEMODE_DURATION = "nifi.cluster.manager.safemode.duration"; - - // defaults - public static final String DEFAULT_TITLE = "NiFi"; - public static final Boolean DEFAULT_AUTO_RESUME_STATE = true; - public static final String DEFAULT_AUTHORITY_PROVIDER_CONFIGURATION_FILE = "conf/authority-providers.xml"; - public static final String DEFAULT_USER_CREDENTIAL_CACHE_DURATION = "24 hours"; - public static final Integer DEFAULT_REMOTE_INPUT_PORT = null; - public static final Path DEFAULT_TEMPLATE_DIRECTORY = Paths.get("conf", "templates"); - public static final int DEFAULT_WEB_THREADS = 200; - public static final String DEFAULT_WEB_WORKING_DIR = "./work/jetty"; - public static final String DEFAULT_NAR_WORKING_DIR = "./work/nar"; - public static final String DEFAULT_COMPONENT_DOCS_DIRECTORY = "./work/docs/components"; - public static final String DEFAULT_NAR_LIBRARY_DIR = "./lib"; - public static final String DEFAULT_FLOWFILE_REPO_PARTITIONS = "256"; - public static final String DEFAULT_FLOWFILE_CHECKPOINT_INTERVAL = "2 min"; - public static final int DEFAULT_MAX_FLOWFILES_PER_CLAIM = 100; - public static final int DEFAULT_QUEUE_SWAP_THRESHOLD = 20000; - public static final String DEFAULT_SWAP_STORAGE_LOCATION = "./flowfile_repository/swap"; - public static final String DEFAULT_SWAP_IN_PERIOD = "1 sec"; - public static final String DEFAULT_SWAP_OUT_PERIOD = "5 sec"; - public static final int DEFAULT_SWAP_IN_THREADS = 4; - public static final int DEFAULT_SWAP_OUT_THREADS = 4; - public static final String DEFAULT_ADMINISTRATIVE_YIELD_DURATION = "30 sec"; - public static final String DEFAULT_PERSISTENT_STATE_DIRECTORY = "./conf/state"; - public static final String DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY = "5 mins"; - - // cluster common defaults - public static final String DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL = "5 sec"; - public static final String DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_BROADCAST_DELAY = "500 ms"; - public static final int DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS = 3; - public static final String DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS_DELAY = "1 sec"; - public static final String DEFAULT_CLUSTER_PROTOCOL_SOCKET_TIMEOUT = "30 sec"; - public static final String DEFAULT_CLUSTER_PROTOCOL_CONNECTION_HANDSHAKE_TIMEOUT = "45 sec"; - - // cluster node defaults - public static final int DEFAULT_CLUSTER_NODE_PROTOCOL_THREADS = 2; - - // cluster manager defaults - public static final int DEFAULT_CLUSTER_MANAGER_NODE_EVENT_HISTORY_SIZE = 10; - public static final String DEFAULT_CLUSTER_MANAGER_NODE_API_CONNECTION_TIMEOUT = "30 sec"; - public static final String DEFAULT_CLUSTER_MANAGER_NODE_API_READ_TIMEOUT = "30 sec"; - public static final int DEFAULT_CLUSTER_MANAGER_NODE_API_NUM_REQUEST_THREADS = 10; - public static final String DEFAULT_CLUSTER_MANAGER_FLOW_RETRIEVAL_DELAY = "5 sec"; - public static final int DEFAULT_CLUSTER_MANAGER_PROTOCOL_THREADS = 10; - public static final String DEFAULT_CLUSTER_MANAGER_SAFEMODE_DURATION = "0 sec"; - - private NiFiProperties() { - super(); - } - - /** - * This is the method through which the NiFiProperties object should be - * obtained. - * - * @return the NiFiProperties object to use - * @throws RuntimeException if unable to load properties file - */ - public static synchronized NiFiProperties getInstance() { - if (null == instance) { - final NiFiProperties suspectInstance = new NiFiProperties(); - final String nfPropertiesFilePath = System.getProperty(NiFiProperties.PROPERTIES_FILE_PATH); - if (null == nfPropertiesFilePath || nfPropertiesFilePath.trim().length() == 0) { - throw new RuntimeException("Requires a system property called \'" + NiFiProperties.PROPERTIES_FILE_PATH + "\' and this is not set or has no value"); - } - final File propertiesFile = new File(nfPropertiesFilePath); - if (!propertiesFile.exists()) { - throw new RuntimeException("Properties file doesn't exist \'" + propertiesFile.getAbsolutePath() + "\'"); - } - if (!propertiesFile.canRead()) { - throw new RuntimeException("Properties file exists but cannot be read \'" + propertiesFile.getAbsolutePath() + "\'"); - } - InputStream inStream = null; - try { - inStream = new BufferedInputStream(new FileInputStream(propertiesFile)); - suspectInstance.load(inStream); - } catch (final Exception ex) { - LOG.error("Cannot load properties file due to " + ex.getLocalizedMessage()); - throw new RuntimeException("Cannot load properties file due to " + ex.getLocalizedMessage(), ex); - } finally { - if (null != inStream) { - try { - inStream.close(); - } catch (final Exception ex) { - /** - * do nothing * - */ - } - } - } - instance = suspectInstance; - } - return instance; - } - - // getters for core properties // - public File getFlowConfigurationFile() { - try { - return new File(getProperty(FLOW_CONFIGURATION_FILE)); - } catch (Exception ex) { - return null; - } - } - - public File getFlowConfigurationFileDir() { - try { - return getFlowConfigurationFile().getParentFile(); - } catch (Exception ex) { - return null; - } - } - - private Integer getPropertyAsPort(final String propertyName, final Integer defaultValue) { - final String port = getProperty(propertyName); - if (StringUtils.isEmpty(port)) { - return defaultValue; - } - try { - final int val = Integer.parseInt(port); - if (val <= 0 || val > 65535) { - throw new RuntimeException("Valid port range is 0 - 65535 but got " + val); - } - return val; - } catch (final NumberFormatException e) { - return defaultValue; - } - } - - public int getQueueSwapThreshold() { - final String thresholdValue = getProperty(QUEUE_SWAP_THRESHOLD); - if (thresholdValue == null) { - return DEFAULT_QUEUE_SWAP_THRESHOLD; - } - - try { - return Integer.parseInt(thresholdValue); - } catch (final NumberFormatException e) { - return DEFAULT_QUEUE_SWAP_THRESHOLD; - } - } - - public Integer getIntegerProperty(final String propertyName, final Integer defaultValue) { - final String value = getProperty(propertyName); - if (value == null) { - return defaultValue; - } - - try { - return Integer.parseInt(getProperty(propertyName)); - } catch (final Exception e) { - return defaultValue; - } - } - - public int getSwapInThreads() { - return getIntegerProperty(SWAP_IN_THREADS, DEFAULT_SWAP_IN_THREADS); - } - - public int getSwapOutThreads() { - final String value = getProperty(SWAP_OUT_THREADS); - if (value == null) { - return DEFAULT_SWAP_OUT_THREADS; - } - - try { - return Integer.parseInt(getProperty(SWAP_OUT_THREADS)); - } catch (final Exception e) { - return DEFAULT_SWAP_OUT_THREADS; - } - } - - public String getSwapInPeriod() { - return getProperty(SWAP_IN_PERIOD, DEFAULT_SWAP_IN_PERIOD); - } - - public String getSwapOutPeriod() { - return getProperty(SWAP_OUT_PERIOD, DEFAULT_SWAP_OUT_PERIOD); - } - - public String getAdministrativeYieldDuration() { - return getProperty(ADMINISTRATIVE_YIELD_DURATION, DEFAULT_ADMINISTRATIVE_YIELD_DURATION); - } - - /** - * The socket port to listen on for a Remote Input Port. - * - * @return - */ - public Integer getRemoteInputPort() { - return getPropertyAsPort(REMOTE_INPUT_PORT, DEFAULT_REMOTE_INPUT_PORT); - } - - /** - * @return False if property value is 'false'; True otherwise. - */ - public Boolean isSiteToSiteSecure() { - final String secureVal = getProperty(SITE_TO_SITE_SECURE, "true"); - - if ("false".equalsIgnoreCase(secureVal)) { - return false; - }else{ - return true; - } - - } - - /** - * Returns the directory to which Templates are to be persisted - * - * @return - */ - public Path getTemplateDirectory() { - final String strVal = getProperty(TEMPLATE_DIRECTORY); - return (strVal == null) ? DEFAULT_TEMPLATE_DIRECTORY : Paths.get(strVal); - } - - /** - * Get the flow service write delay. - * - * @return The write delay - */ - public String getFlowServiceWriteDelay() { - return getProperty(WRITE_DELAY_INTERVAL); - } - - /** - * Returns whether the processors should be started automatically when the - * application loads. - * - * @return Whether to auto start the processors or not - */ - public boolean getAutoResumeState() { - final String rawAutoResumeState = getProperty(AUTO_RESUME_STATE, DEFAULT_AUTO_RESUME_STATE.toString()); - return Boolean.parseBoolean(rawAutoResumeState); - } - - /** - * Returns the number of partitions that should be used for the FlowFile - * Repository - * - * @return - */ - public int getFlowFileRepositoryPartitions() { - final String rawProperty = getProperty(FLOWFILE_REPOSITORY_PARTITIONS, DEFAULT_FLOWFILE_REPO_PARTITIONS); - return Integer.parseInt(rawProperty); - } - - /** - * Returns the number of milliseconds between FlowFileRepository - * checkpointing - * - * @return - */ - public String getFlowFileRepositoryCheckpointInterval() { - return getProperty(FLOWFILE_REPOSITORY_CHECKPOINT_INTERVAL, DEFAULT_FLOWFILE_CHECKPOINT_INTERVAL); - } - - /** - * @return the restore directory or null if not configured - */ - public File getRestoreDirectory() { - final String value = getProperty(RESTORE_DIRECTORY); - if (StringUtils.isBlank(value)) { - return null; - } else { - return new File(value); - } - } - - /** - * @return the user authorities file - */ - public File getAuthorityProviderConfiguraitonFile() { - final String value = getProperty(AUTHORITY_PROVIDER_CONFIGURATION_FILE); - if (StringUtils.isBlank(value)) { - return new File(DEFAULT_AUTHORITY_PROVIDER_CONFIGURATION_FILE); - } else { - return new File(value); - } - } - - /** - * Will default to true unless the value is explicitly set to false. - * - * @return Whether client auth is required - */ - public boolean getNeedClientAuth() { - boolean needClientAuth = true; - String rawNeedClientAuth = getProperty(SECURITY_NEED_CLIENT_AUTH); - if ("false".equalsIgnoreCase(rawNeedClientAuth)) { - needClientAuth = false; - } - return needClientAuth; - } - - public String getUserCredentialCacheDuration() { - return getProperty(SECURITY_USER_CREDENTIAL_CACHE_DURATION, DEFAULT_USER_CREDENTIAL_CACHE_DURATION); - } - - public boolean getSupportNewAccountRequests() { - boolean shouldSupport = true; - String rawShouldSupport = getProperty(SECURITY_SUPPORT_NEW_ACCOUNT_REQUESTS); - if ("false".equalsIgnoreCase(rawShouldSupport)) { - shouldSupport = false; - } - return shouldSupport; - } - - // getters for web properties // - public Integer getPort() { - Integer port = null; - try { - port = Integer.parseInt(getProperty(WEB_HTTP_PORT)); - } catch (NumberFormatException nfe) { - } - return port; - } - - public Integer getSslPort() { - Integer sslPort = null; - try { - sslPort = Integer.parseInt(getProperty(WEB_HTTPS_PORT)); - } catch (NumberFormatException nfe) { - } - return sslPort; - } - - public int getWebThreads() { - return getIntegerProperty(WEB_THREADS, DEFAULT_WEB_THREADS); - } - - public File getWebWorkingDirectory() { - return new File(getProperty(WEB_WORKING_DIR, DEFAULT_WEB_WORKING_DIR)); - } - - public File getComponentDocumentationWorkingDirectory() { - return new File(getProperty(COMPONENT_DOCS_DIRECTORY, DEFAULT_COMPONENT_DOCS_DIRECTORY)); - } - - public File getNarWorkingDirectory() { - return new File(getProperty(NAR_WORKING_DIRECTORY, DEFAULT_NAR_WORKING_DIR)); - } - - public File getFrameworkWorkingDirectory() { - return new File(getNarWorkingDirectory(), "framework"); - } - - public File getExtensionsWorkingDirectory() { - return new File(getNarWorkingDirectory(), "extensions"); - } - - public File getNarLibraryDirectory() { - return new File(getProperty(NAR_LIBRARY_DIRECTORY, DEFAULT_NAR_LIBRARY_DIR)); - } - - // getters for ui properties // - /** - * Get the title for the UI. - * - * @return The UI title - */ - public String getUiTitle() { - return this.getProperty(VERSION, DEFAULT_TITLE); - } - - /** - * Get the banner text. - * - * @return The banner text - */ - public String getBannerText() { - return this.getProperty(UI_BANNER_TEXT, StringUtils.EMPTY); - } - - /** - * Returns the auto refresh interval in seconds. - * - * @return - */ - public String getAutoRefreshInterval() { - return getProperty(UI_AUTO_REFRESH_INTERVAL); - } - - // getters for cluster protocol properties // - public String getClusterProtocolHeartbeatInterval() { - return getProperty(CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL, DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL); - } - - public String getNodeHeartbeatInterval() { - return getClusterProtocolHeartbeatInterval(); - } - - public String getClusterProtocolSocketTimeout() { - return getProperty(CLUSTER_PROTOCOL_SOCKET_TIMEOUT, DEFAULT_CLUSTER_PROTOCOL_SOCKET_TIMEOUT); - } - - public String getClusterProtocolConnectionHandshakeTimeout() { - return getProperty(CLUSTER_PROTOCOL_CONNECTION_HANDSHAKE_TIMEOUT, DEFAULT_CLUSTER_PROTOCOL_CONNECTION_HANDSHAKE_TIMEOUT); - } - - public boolean getClusterProtocolUseMulticast() { - return Boolean.parseBoolean(getProperty(CLUSTER_PROTOCOL_USE_MULTICAST)); - } - - public InetSocketAddress getClusterProtocolMulticastAddress() { - try { - String multicastAddress = getProperty(CLUSTER_PROTOCOL_MULTICAST_ADDRESS); - int multicastPort = Integer.parseInt(getProperty(CLUSTER_PROTOCOL_MULTICAST_PORT)); - return new InetSocketAddress(multicastAddress, multicastPort); - } catch (Exception ex) { - throw new RuntimeException("Invalid multicast address/port due to: " + ex, ex); - } - } - - public String getClusterProtocolMulticastServiceBroadcastDelay() { - return getProperty(CLUSTER_PROTOCOL_MULTICAST_SERVICE_BROADCAST_DELAY); - } - - public File getPersistentStateDirectory() { - final String dirName = getProperty(PERSISTENT_STATE_DIRECTORY, DEFAULT_PERSISTENT_STATE_DIRECTORY); - final File file = new File(dirName); - if (!file.exists()) { - file.mkdirs(); - } - return file; - } - - public int getClusterProtocolMulticastServiceLocatorAttempts() { - try { - return Integer.parseInt(getProperty(CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS)); - } catch (NumberFormatException nfe) { - return DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS; - } - } - - public String getClusterProtocolMulticastServiceLocatorAttemptsDelay() { - return getProperty(CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS_DELAY, DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS_DELAY); - } - - // getters for cluster node properties // - public boolean isNode() { - return Boolean.parseBoolean(getProperty(CLUSTER_IS_NODE)); - } - - public InetSocketAddress getClusterNodeProtocolAddress() { - try { - String socketAddress = getProperty(CLUSTER_NODE_ADDRESS); - if (StringUtils.isBlank(socketAddress)) { - socketAddress = "localhost"; - } - int socketPort = getClusterNodeProtocolPort(); - return InetSocketAddress.createUnresolved(socketAddress, socketPort); - } catch (Exception ex) { - throw new RuntimeException("Invalid node protocol address/port due to: " + ex, ex); - } - } - - public Integer getClusterNodeProtocolPort() { - try { - return Integer.parseInt(getProperty(CLUSTER_NODE_PROTOCOL_PORT)); - } catch (NumberFormatException nfe) { - return null; - } - } - - public int getClusterNodeProtocolThreads() { - try { - return Integer.parseInt(getProperty(CLUSTER_NODE_PROTOCOL_THREADS)); - } catch (NumberFormatException nfe) { - return DEFAULT_CLUSTER_NODE_PROTOCOL_THREADS; - } - } - - public InetSocketAddress getClusterNodeUnicastManagerProtocolAddress() { - try { - String socketAddress = getProperty(CLUSTER_NODE_UNICAST_MANAGER_ADDRESS); - if (StringUtils.isBlank(socketAddress)) { - socketAddress = "localhost"; - } - int socketPort = Integer.parseInt(getProperty(CLUSTER_NODE_UNICAST_MANAGER_PROTOCOL_PORT)); - return InetSocketAddress.createUnresolved(socketAddress, socketPort); - } catch (Exception ex) { - throw new RuntimeException("Invalid unicast manager address/port due to: " + ex, ex); - } - } - - // getters for cluster manager properties // - public boolean isClusterManager() { - return Boolean.parseBoolean(getProperty(CLUSTER_IS_MANAGER)); - } - - public InetSocketAddress getClusterManagerProtocolAddress() { - try { - String socketAddress = getProperty(CLUSTER_MANAGER_ADDRESS); - if (StringUtils.isBlank(socketAddress)) { - socketAddress = "localhost"; - } - int socketPort = getClusterManagerProtocolPort(); - return InetSocketAddress.createUnresolved(socketAddress, socketPort); - } catch (Exception ex) { - throw new RuntimeException("Invalid manager protocol address/port due to: " + ex, ex); - } - } - - public Integer getClusterManagerProtocolPort() { - try { - return Integer.parseInt(getProperty(CLUSTER_MANAGER_PROTOCOL_PORT)); - } catch (NumberFormatException nfe) { - return null; - } - } - - public File getClusterManagerNodeFirewallFile() { - final String firewallFile = getProperty(CLUSTER_MANAGER_NODE_FIREWALL_FILE); - if (StringUtils.isBlank(firewallFile)) { - return null; - } else { - return new File(firewallFile); - } - } - - public int getClusterManagerNodeEventHistorySize() { - try { - return Integer.parseInt(getProperty(CLUSTER_MANAGER_NODE_EVENT_HISTORY_SIZE)); - } catch (NumberFormatException nfe) { - return DEFAULT_CLUSTER_MANAGER_NODE_EVENT_HISTORY_SIZE; - } - } - - public String getClusterManagerNodeApiConnectionTimeout() { - return getProperty(CLUSTER_MANAGER_NODE_API_CONNECTION_TIMEOUT, DEFAULT_CLUSTER_MANAGER_NODE_API_CONNECTION_TIMEOUT); - } - - public String getClusterManagerNodeApiReadTimeout() { - return getProperty(CLUSTER_MANAGER_NODE_API_READ_TIMEOUT, DEFAULT_CLUSTER_MANAGER_NODE_API_READ_TIMEOUT); - } - - public int getClusterManagerNodeApiRequestThreads() { - try { - return Integer.parseInt(getProperty(CLUSTER_MANAGER_NODE_API_REQUEST_THREADS)); - } catch (NumberFormatException nfe) { - return DEFAULT_CLUSTER_MANAGER_NODE_API_NUM_REQUEST_THREADS; - } - } - - public String getClusterManagerFlowRetrievalDelay() { - return getProperty(CLUSTER_MANAGER_FLOW_RETRIEVAL_DELAY, DEFAULT_CLUSTER_MANAGER_FLOW_RETRIEVAL_DELAY); - } - - public int getClusterManagerProtocolThreads() { - try { - return Integer.parseInt(getProperty(CLUSTER_MANAGER_PROTOCOL_THREADS)); - } catch (NumberFormatException nfe) { - return DEFAULT_CLUSTER_MANAGER_PROTOCOL_THREADS; - } - } - - public String getClusterManagerSafeModeDuration() { - return getProperty(CLUSTER_MANAGER_SAFEMODE_DURATION, DEFAULT_CLUSTER_MANAGER_SAFEMODE_DURATION); - } - - public String getClusterProtocolManagerToNodeApiScheme() { - final String isSecureProperty = getProperty(CLUSTER_PROTOCOL_IS_SECURE); - if (Boolean.valueOf(isSecureProperty)) { - return "https"; - } else { - return "http"; - } - } - - public InetSocketAddress getNodeApiAddress() { - - final String rawScheme = getClusterProtocolManagerToNodeApiScheme(); - final String scheme = (rawScheme == null) ? "http" : rawScheme; - - final String host; - final int port; - if ("http".equalsIgnoreCase(scheme)) { - // get host - if (StringUtils.isBlank(getProperty(WEB_HTTP_HOST))) { - host = "localhost"; - } else { - host = getProperty(WEB_HTTP_HOST); - } - // get port - port = getPort(); - } else { - // get host - if (StringUtils.isBlank(getProperty(WEB_HTTPS_HOST))) { - host = "localhost"; - } else { - host = getProperty(WEB_HTTPS_HOST); - } - // get port - port = getSslPort(); - } - - return InetSocketAddress.createUnresolved(host, port); - - } - - /** - * Returns the database repository path. It simply returns the value - * configured. No directories will be created as a result of this operation. - * - * @return database repository path - * @throws InvalidPathException If the configured path is invalid - */ - public Path getDatabaseRepositoryPath() { - return Paths.get(getProperty(REPOSITORY_DATABASE_DIRECTORY)); - } - - /** - * Returns the flow file repository path. It simply returns the value - * configured. No directories will be created as a result of this operation. - * - * @return database repository path - * @throws InvalidPathException If the configured path is invalid - */ - public Path getFlowFileRepositoryPath() { - return Paths.get(getProperty(FLOWFILE_REPOSITORY_DIRECTORY)); - } - - /** - * Returns the content repository paths. This method returns a mapping of - * file repository name to file repository paths. It simply returns the - * values configured. No directories will be created as a result of this - * operation. - * - * @return file repositories paths - * @throws InvalidPathException If any of the configured paths are invalid - */ - public Map<String, Path> getContentRepositoryPaths() { - final Map<String, Path> contentRepositoryPaths = new HashMap<>(); - - // go through each property - for (String propertyName : stringPropertyNames()) { - // determine if the property is a file repository path - if (StringUtils.startsWith(propertyName, REPOSITORY_CONTENT_PREFIX)) { - // get the repository key - final String key = StringUtils.substringAfter(propertyName, REPOSITORY_CONTENT_PREFIX); - - // attempt to resolve the path specified - contentRepositoryPaths.put(key, Paths.get(getProperty(propertyName))); - } - } - return contentRepositoryPaths; - } - - /** - * Returns the provenance repository paths. This method returns a mapping of - * file repository name to file repository paths. It simply returns the - * values configured. No directories will be created as a result of this - * operation. - * - * @return - */ - public Map<String, Path> getProvenanceRepositoryPaths() { - final Map<String, Path> provenanceRepositoryPaths = new HashMap<>(); - - // go through each property - for (String propertyName : stringPropertyNames()) { - // determine if the property is a file repository path - if (StringUtils.startsWith(propertyName, PROVENANCE_REPO_DIRECTORY_PREFIX)) { - // get the repository key - final String key = StringUtils.substringAfter(propertyName, PROVENANCE_REPO_DIRECTORY_PREFIX); - - // attempt to resolve the path specified - provenanceRepositoryPaths.put(key, Paths.get(getProperty(propertyName))); - } - } - return provenanceRepositoryPaths; - } - - public int getMaxFlowFilesPerClaim() { - try { - return Integer.parseInt(getProperty(MAX_FLOWFILES_PER_CLAIM)); - } catch (NumberFormatException nfe) { - return DEFAULT_MAX_FLOWFILES_PER_CLAIM; - } - } - - public String getMaxAppendableClaimSize() { - return getProperty(MAX_APPENDABLE_CLAIM_SIZE); - } - - @Override - public String getProperty(final String key, final String defaultValue) { - final String value = super.getProperty(key, defaultValue); - if (value == null) { - return null; - } - - if (value.trim().isEmpty()) { - return defaultValue; - } - return value; - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-properties/src/main/java/org/apache/nifi/util/StringUtils.java ---------------------------------------------------------------------- diff --git a/commons/nifi-properties/src/main/java/org/apache/nifi/util/StringUtils.java b/commons/nifi-properties/src/main/java/org/apache/nifi/util/StringUtils.java deleted file mode 100644 index aa6f8f3..0000000 --- a/commons/nifi-properties/src/main/java/org/apache/nifi/util/StringUtils.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util; - -/** - * String Utils based on the Apache Commons Lang String Utils. - * These simple util methods here allow us to avoid a dependency in the core - */ -public class StringUtils { - - public static final String EMPTY = ""; - - public static boolean isBlank(final String str) { - if (str == null || str.isEmpty()) { - return true; - } - for (int i = 0; i < str.length(); i++) { - if (!Character.isWhitespace(str.charAt(i))) { - return false; - } - } - return true; - } - - public static boolean isEmpty(final String str) { - return str == null || str.isEmpty(); - } - - public static boolean startsWith(final String str, final String prefix) { - if (str == null || prefix == null) { - return (str == null && prefix == null); - } - if (prefix.length() > str.length()) { - return false; - } - return str.regionMatches(false, 0, prefix, 0, prefix.length()); - } - - public static String substringAfter(final String str, final String separator) { - if (isEmpty(str)) { - return str; - } - if (separator == null) { - return EMPTY; - } - int pos = str.indexOf(separator); - if (pos == -1) { - return EMPTY; - } - return str.substring(pos + separator.length()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-security-utils/pom.xml ---------------------------------------------------------------------- diff --git a/commons/nifi-security-utils/pom.xml b/commons/nifi-security-utils/pom.xml deleted file mode 100644 index 0eaaeb4..0000000 --- a/commons/nifi-security-utils/pom.xml +++ /dev/null @@ -1,40 +0,0 @@ -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-commons-parent</artifactId> - <version>0.0.1-SNAPSHOT</version> - </parent> - - <artifactId>nifi-security-utils</artifactId> - <version>0.0.1-SNAPSHOT</version> - <name>NiFi Security Utils</name> - <description>Contains security functionality.</description> - - <dependencies> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-lang3</artifactId> - </dependency> - </dependencies> -</project> - http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/CertificateUtils.java ---------------------------------------------------------------------- diff --git a/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/CertificateUtils.java b/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/CertificateUtils.java deleted file mode 100644 index 087d891..0000000 --- a/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/CertificateUtils.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.security.util; - -import java.io.BufferedInputStream; -import java.io.IOException; -import java.net.URL; -import java.security.KeyStore; -import java.security.cert.CertificateParsingException; -import java.security.cert.X509Certificate; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public final class CertificateUtils { - - private static final Logger logger = LoggerFactory.getLogger(CertificateUtils.class); - - /** - * Returns true if the given keystore can be loaded using the given keystore - * type and password. Returns false otherwise. - * @param keystore - * @param keystoreType - * @param password - * @return - */ - public static boolean isStoreValid(final URL keystore, final KeystoreType keystoreType, final char[] password) { - - if (keystore == null) { - throw new IllegalArgumentException("keystore may not be null"); - } else if (keystoreType == null) { - throw new IllegalArgumentException("keystore type may not be null"); - } else if (password == null) { - throw new IllegalArgumentException("password may not be null"); - } - - BufferedInputStream bis = null; - final KeyStore ks; - try { - - // load the keystore - bis = new BufferedInputStream(keystore.openStream()); - ks = KeyStore.getInstance(keystoreType.name()); - ks.load(bis, password); - - return true; - - } catch (Exception e) { - return false; - } finally { - if (bis != null) { - try { - bis.close(); - } catch (final IOException ioe) { - logger.warn("Failed to close input stream", ioe); - } - } - } - } - - /** - * Extracts the username from the specified DN. If the username cannot be - * extracted because the CN is in an unrecognized format, the entire CN is - * returned. If the CN cannot be extracted because the DN is in an - * unrecognized format, the entire DN is returned. - * - * @param dn - * @return - */ - public static String extractUsername(String dn) { - String username = dn; - String cn = ""; - - // ensure the dn is specified - if (StringUtils.isNotBlank(dn)) { - - // attempt to locate the cn - if (dn.startsWith("CN=")) { - cn = StringUtils.substringBetween(dn, "CN=", ","); - } else if (dn.startsWith("/CN=")) { - cn = StringUtils.substringBetween(dn, "CN=", "/"); - } else if (dn.startsWith("C=") || dn.startsWith("/C=")) { - cn = StringUtils.substringAfter(dn, "CN="); - } else if (dn.startsWith("/") && StringUtils.contains(dn, "CN=")) { - cn = StringUtils.substringAfter(dn, "CN="); - } - - // attempt to get the username from the cn - if (StringUtils.isNotBlank(cn)) { - if (cn.endsWith(")")) { - username = StringUtils.substringBetween(cn, "(", ")"); - } else if (cn.contains(" ")) { - username = StringUtils.substringAfterLast(cn, " "); - } else { - username = cn; - } - } - } - - return username; - } - - /** - * Returns a list of subject alternative names. Any name that is represented - * as a String by X509Certificate.getSubjectAlternativeNames() is converted - * to lowercase and returned. - * - * @param certificate a certificate - * @return a list of subject alternative names; list is never null - * @throws CertificateParsingException if parsing the certificate failed - */ - public static List<String> getSubjectAlternativeNames(final X509Certificate certificate) throws CertificateParsingException { - - final Collection<List<?>> altNames = certificate.getSubjectAlternativeNames(); - if (altNames == null) { - return new ArrayList<>(); - } - - final List<String> result = new ArrayList<>(); - for (final List<?> generalName : altNames) { - /* - * generalName has the name type as the first element a String or - * byte array for the second element. We return any general names - * that are String types. - * - * We don't inspect the numeric name type because some certificates - * incorrectly put IPs and DNS names under the wrong name types. - */ - final Object value = generalName.get(1); - if (value instanceof String) { - result.add(((String) value).toLowerCase()); - } - - } - - return result; - } - - private CertificateUtils() { - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/EncryptionMethod.java ---------------------------------------------------------------------- diff --git a/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/EncryptionMethod.java b/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/EncryptionMethod.java deleted file mode 100644 index 741fdde..0000000 --- a/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/EncryptionMethod.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.security.util; - -import org.apache.commons.lang3.builder.ToStringBuilder; -import org.apache.commons.lang3.builder.ToStringStyle; - -/** - * Enumeration capturing essential information about the various encryption - * methods that might be supported. - * - * @author none - */ -public enum EncryptionMethod { - - MD5_128AES("PBEWITHMD5AND128BITAES-CBC-OPENSSL", "BC", false), - MD5_256AES("PBEWITHMD5AND256BITAES-CBC-OPENSSL", "BC", false), - SHA1_RC2("PBEWITHSHA1ANDRC2", "BC", false), - SHA1_DES("PBEWITHSHA1ANDDES", "BC", false), - MD5_192AES("PBEWITHMD5AND192BITAES-CBC-OPENSSL", "BC", false), - MD5_DES("PBEWITHMD5ANDDES", "BC", false), - MD5_RC2("PBEWITHMD5ANDRC2", "BC", false), - SHA_192AES("PBEWITHSHAAND192BITAES-CBC-BC", "BC", true), - SHA_40RC4("PBEWITHSHAAND40BITRC4", "BC", true), - SHA256_128AES("PBEWITHSHA256AND128BITAES-CBC-BC", "BC", true), - SHA_128RC2("PBEWITHSHAAND128BITRC2-CBC", "BC", true), - SHA_128AES("PBEWITHSHAAND128BITAES-CBC-BC", "BC", true), - SHA256_192AES("PBEWITHSHA256AND192BITAES-CBC-BC", "BC", true), - SHA_2KEYTRIPLEDES("PBEWITHSHAAND2-KEYTRIPLEDES-CBC", "BC", true), - SHA256_256AES("PBEWITHSHA256AND256BITAES-CBC-BC", "BC", true), - SHA_40RC2("PBEWITHSHAAND40BITRC2-CBC", "BC", true), - SHA_256AES("PBEWITHSHAAND256BITAES-CBC-BC", "BC", true), - SHA_3KEYTRIPLEDES("PBEWITHSHAAND3-KEYTRIPLEDES-CBC", "BC", true), - SHA_TWOFISH("PBEWITHSHAANDTWOFISH-CBC", "BC", true), - SHA_128RC4("PBEWITHSHAAND128BITRC4", "BC", true); - private final String algorithm; - private final String provider; - private final boolean unlimitedStrength; - - EncryptionMethod(String algorithm, String provider, boolean unlimitedStrength) { - this.algorithm = algorithm; - this.provider = provider; - this.unlimitedStrength = unlimitedStrength; - } - - public String getProvider() { - return provider; - } - - public String getAlgorithm() { - return algorithm; - } - - /** - * @return true if algorithm requires unlimited strength policies - */ - public boolean isUnlimitedStrength() { - return unlimitedStrength; - } - - @Override - public String toString() { - final ToStringBuilder builder = new ToStringBuilder(this); - ToStringBuilder.setDefaultStyle(ToStringStyle.SHORT_PREFIX_STYLE); - builder.append("algorithm name", algorithm); - builder.append("Requires unlimited strength JCE policy", unlimitedStrength); - builder.append("Algorithm Provider", provider); - return builder.toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/KeystoreType.java ---------------------------------------------------------------------- diff --git a/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/KeystoreType.java b/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/KeystoreType.java deleted file mode 100644 index 18574bb..0000000 --- a/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/KeystoreType.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.security.util; - -/** - * Keystore types. - */ -public enum KeystoreType { - - PKCS12, - JKS; -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SecurityStoreTypes.java ---------------------------------------------------------------------- diff --git a/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SecurityStoreTypes.java b/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SecurityStoreTypes.java deleted file mode 100644 index 9abfcc3..0000000 --- a/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SecurityStoreTypes.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.security.util; - -import java.io.PrintWriter; -import java.io.Writer; - -/** - * Types of security stores and their related Java system properties. - */ -public enum SecurityStoreTypes { - - TRUSTSTORE( - "javax.net.ssl.trustStore", - "javax.net.ssl.trustStorePassword", - "javax.net.ssl.trustStoreType"), - KEYSTORE( - "javax.net.ssl.keyStore", - "javax.net.ssl.keyStorePassword", - "javax.net.ssl.keyStoreType"); - - /** - * Logs the keystore and truststore Java system property values to the given - * writer. It logPasswords is true, then the keystore and truststore - * password property values are logged. - * - * @param writer a writer to log to - * - * @param logPasswords true if passwords should be logged; false otherwise - */ - public static void logProperties(final Writer writer, - final boolean logPasswords) { - if (writer == null) { - return; - } - - PrintWriter pw = new PrintWriter(writer); - - // keystore properties - pw.println( - KEYSTORE.getStoreProperty() + " = " + System.getProperty(KEYSTORE.getStoreProperty())); - - if (logPasswords) { - pw.println( - KEYSTORE.getStorePasswordProperty() + " = " - + System.getProperty(KEYSTORE.getStoreProperty())); - } - - pw.println( - KEYSTORE.getStoreTypeProperty() + " = " - + System.getProperty(KEYSTORE.getStoreTypeProperty())); - - // truststore properties - pw.println( - TRUSTSTORE.getStoreProperty() + " = " - + System.getProperty(TRUSTSTORE.getStoreProperty())); - - if (logPasswords) { - pw.println( - TRUSTSTORE.getStorePasswordProperty() + " = " - + System.getProperty(TRUSTSTORE.getStoreProperty())); - } - - pw.println( - TRUSTSTORE.getStoreTypeProperty() + " = " - + System.getProperty(TRUSTSTORE.getStoreTypeProperty())); - pw.flush(); - } - - /** - * the Java system property for setting the keystore (or truststore) path - */ - private String storeProperty = ""; - - /** - * the Java system property for setting the keystore (or truststore) - * password - */ - private String storePasswordProperty = ""; - - /** - * the Java system property for setting the keystore (or truststore) type - */ - private String storeTypeProperty = ""; - - /** - * Creates an instance. - * - * @param storeProperty the Java system property for setting the keystore ( - * or truststore) path - * @param storePasswordProperty the Java system property for setting the - * keystore (or truststore) password - * @param storeTypeProperty the Java system property for setting the - * keystore (or truststore) type - */ - SecurityStoreTypes(final String storeProperty, - final String storePasswordProperty, - final String storeTypeProperty) { - this.storeProperty = storeProperty; - this.storePasswordProperty = storePasswordProperty; - this.storeTypeProperty = storeTypeProperty; - } - - /** - * Returns the keystore (or truststore) property. - * - * @return the keystore (or truststore) property - */ - public String getStoreProperty() { - return storeProperty; - } - - /** - * Returns the keystore (or truststore) password property. - * - * @return the keystore (or truststore) password property - */ - public String getStorePasswordProperty() { - return storePasswordProperty; - } - - /** - * Returns the keystore (or truststore) type property. - * - * @return the keystore (or truststore) type property - */ - public String getStoreTypeProperty() { - return storeTypeProperty; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SslContextFactory.java ---------------------------------------------------------------------- diff --git a/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SslContextFactory.java b/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SslContextFactory.java deleted file mode 100644 index 2371b0c..0000000 --- a/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SslContextFactory.java +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.security.util; - -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.security.KeyManagementException; -import java.security.KeyStore; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.security.SecureRandom; -import java.security.UnrecoverableKeyException; -import java.security.cert.CertificateException; - -import javax.net.ssl.KeyManager; -import javax.net.ssl.KeyManagerFactory; -import javax.net.ssl.SSLContext; -import javax.net.ssl.TrustManager; -import javax.net.ssl.TrustManagerFactory; - -/** - * A factory for creating SSL contexts using the application's security - * properties. - * - * @author unattributed - */ -public final class SslContextFactory { - - public static enum ClientAuth { - - WANT, - REQUIRED, - NONE - } - - /** - * Creates a SSLContext instance using the given information. - * - * @param keystore the full path to the keystore - * @param keystorePasswd the keystore password - * @param keystoreType the type of keystore (e.g., PKCS12, JKS) - * @param truststore the full path to the truststore - * @param truststorePasswd the truststore password - * @param truststoreType the type of truststore (e.g., PKCS12, JKS) - * @param clientAuth the type of client authentication - * - * @return a SSLContext instance - * @throws java.security.KeyStoreException - * @throws java.io.IOException - * @throws java.security.NoSuchAlgorithmException - * @throws java.security.cert.CertificateException - * @throws java.security.UnrecoverableKeyException - * @throws java.security.KeyManagementException - */ - public static SSLContext createSslContext( - final String keystore, final char[] keystorePasswd, final String keystoreType, - final String truststore, final char[] truststorePasswd, final String truststoreType, - final ClientAuth clientAuth) - throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException, - UnrecoverableKeyException, KeyManagementException { - - // prepare the keystore - final KeyStore keyStore = KeyStore.getInstance(keystoreType); - try (final InputStream keyStoreStream = new FileInputStream(keystore)) { - keyStore.load(keyStoreStream, keystorePasswd); - } - final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); - keyManagerFactory.init(keyStore, keystorePasswd); - - // prepare the truststore - final KeyStore trustStore = KeyStore.getInstance(truststoreType); - try (final InputStream trustStoreStream = new FileInputStream(truststore)) { - trustStore.load(trustStoreStream, truststorePasswd); - } - final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); - trustManagerFactory.init(trustStore); - - // initialize the ssl context - final SSLContext sslContext = SSLContext.getInstance("TLS"); - sslContext.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), new SecureRandom()); - if (ClientAuth.REQUIRED == clientAuth) { - sslContext.getDefaultSSLParameters().setNeedClientAuth(true); - } else if (ClientAuth.WANT == clientAuth) { - sslContext.getDefaultSSLParameters().setWantClientAuth(true); - } else { - sslContext.getDefaultSSLParameters().setWantClientAuth(false); - } - - return sslContext; - - } - - /** - * Creates a SSLContext instance using the given information. - * - * @param keystore the full path to the keystore - * @param keystorePasswd the keystore password - * @param keystoreType the type of keystore (e.g., PKCS12, JKS) - * - * @return a SSLContext instance - * @throws java.security.KeyStoreException - * @throws java.io.IOException - * @throws java.security.NoSuchAlgorithmException - * @throws java.security.cert.CertificateException - * @throws java.security.UnrecoverableKeyException - * @throws java.security.KeyManagementException - */ - public static SSLContext createSslContext( - final String keystore, final char[] keystorePasswd, final String keystoreType) - throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException, - UnrecoverableKeyException, KeyManagementException { - - // prepare the keystore - final KeyStore keyStore = KeyStore.getInstance(keystoreType); - try (final InputStream keyStoreStream = new FileInputStream(keystore)) { - keyStore.load(keyStoreStream, keystorePasswd); - } - final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); - keyManagerFactory.init(keyStore, keystorePasswd); - - // initialize the ssl context - final SSLContext ctx = SSLContext.getInstance("TLS"); - ctx.init(keyManagerFactory.getKeyManagers(), new TrustManager[0], new SecureRandom()); - - return ctx; - - } - - /** - * Creates a SSLContext instance using the given information. - * - * @param truststore the full path to the truststore - * @param truststorePasswd the truststore password - * @param truststoreType the type of truststore (e.g., PKCS12, JKS) - * - * @return a SSLContext instance - * @throws java.security.KeyStoreException - * @throws java.io.IOException - * @throws java.security.NoSuchAlgorithmException - * @throws java.security.cert.CertificateException - * @throws java.security.UnrecoverableKeyException - * @throws java.security.KeyManagementException - */ - public static SSLContext createTrustSslContext( - final String truststore, final char[] truststorePasswd, final String truststoreType) - throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException, - UnrecoverableKeyException, KeyManagementException { - - // prepare the truststore - final KeyStore trustStore = KeyStore.getInstance(truststoreType); - try (final InputStream trustStoreStream = new FileInputStream(truststore)) { - trustStore.load(trustStoreStream, truststorePasswd); - } - final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); - trustManagerFactory.init(trustStore); - - // initialize the ssl context - final SSLContext ctx = SSLContext.getInstance("TLS"); - ctx.init(new KeyManager[0], trustManagerFactory.getTrustManagers(), new SecureRandom()); - - return ctx; - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/pom.xml ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/pom.xml b/commons/nifi-socket-utils/pom.xml deleted file mode 100644 index efb5a8e..0000000 --- a/commons/nifi-socket-utils/pom.xml +++ /dev/null @@ -1,60 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-commons-parent</artifactId> - <version>0.0.1-SNAPSHOT</version> - </parent> - - <artifactId>nifi-socket-utils</artifactId> - <version>0.0.1-SNAPSHOT</version> - <name>NiFi Socket Utils</name> - <description>Utilities for socket communication</description> - - <dependencies> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-utils</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-logging-utils</artifactId> - </dependency> - <dependency> - <groupId>commons-net</groupId> - <artifactId>commons-net</artifactId> - </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-lang3</artifactId> - </dependency> - <dependency> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-properties</artifactId> - </dependency> - </dependencies> -</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java deleted file mode 100644 index 172c593..0000000 --- a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.io.nio; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.SelectionKey; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.nifi.io.nio.consumer.StreamConsumer; -import org.apache.nifi.io.nio.consumer.StreamConsumerFactory; - -import org.apache.commons.lang3.builder.EqualsBuilder; -import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.commons.lang3.builder.ToStringBuilder; -import org.apache.commons.lang3.builder.ToStringStyle; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * - * @author none - */ -public abstract class AbstractChannelReader implements Runnable { - - private static final Logger LOGGER = LoggerFactory.getLogger(AbstractChannelReader.class); - private final String uniqueId; - private final SelectionKey key; - private final BufferPool bufferPool; - private final StreamConsumer consumer; - private final AtomicBoolean isClosed = new AtomicBoolean(false); - private final AtomicReference<ScheduledFuture<?>> future = new AtomicReference<>(null);//the future on which this reader runs... - - public AbstractChannelReader(final String id, final SelectionKey key, final BufferPool empties, final StreamConsumerFactory consumerFactory) { - this.uniqueId = id; - this.key = key; - this.bufferPool = empties; - this.consumer = consumerFactory.newInstance(id); - consumer.setReturnBufferQueue(bufferPool); - } - - protected void setScheduledFuture(final ScheduledFuture<?> future) { - this.future.set(future); - } - - protected ScheduledFuture<?> getScheduledFuture() { - return future.get(); - } - - protected SelectionKey getSelectionKey() { - return key; - } - - public boolean isClosed() { - return isClosed.get(); - } - - private void closeStream() { - if (isClosed.get()) { - return; - } - try { - isClosed.set(true); - future.get().cancel(false); - key.cancel(); - key.channel().close(); - } catch (final IOException ioe) { - LOGGER.warn("Unable to cleanly close stream due to " + ioe); - } finally { - consumer.signalEndOfStream(); - } - } - - /** - * Allows a subclass to specifically handle how it reads from the given - * key's channel into the given buffer. - * - * @param key - * @param buffer - * @return the number of bytes read in the final read cycle. A value of zero - * or more indicates the channel is still open but a value of -1 indicates - * end of stream. - * @throws IOException - */ - protected abstract int fillBuffer(SelectionKey key, ByteBuffer buffer) throws IOException; - - @Override - public final void run() { - if (!key.isValid() || consumer.isConsumerFinished()) { - closeStream(); - return; - } - if (!key.isReadable()) { - return;//there is nothing available to read...or we aren't allow to read due to throttling - } - ByteBuffer buffer = null; - try { - buffer = bufferPool.poll(); - if (buffer == null) { - return; // no buffers available - come back later - } - final int bytesRead = fillBuffer(key, buffer); - buffer.flip(); - if (buffer.remaining() > 0) { - consumer.addFilledBuffer(buffer); - buffer = null; //clear the reference - is now the consumer's responsiblity - } else { - buffer.clear(); - bufferPool.returnBuffer(buffer, 0); - buffer = null; //clear the reference - is now back to the queue - } - if (bytesRead < 0) { //we've reached the end - closeStream(); - } - } catch (final Exception ioe) { - closeStream(); - LOGGER.error("Closed channel reader " + this + " due to " + ioe); - } finally { - if (buffer != null) { - buffer.clear(); - bufferPool.returnBuffer(buffer, 0); - } - } - } - - @Override - public final boolean equals(final Object obj) { - if (obj == null) { - return false; - } - if (obj == this) { - return true; - } - if (obj.getClass() != getClass()) { - return false; - } - AbstractChannelReader rhs = (AbstractChannelReader) obj; - return new EqualsBuilder().appendSuper(super.equals(obj)).append(uniqueId, rhs.uniqueId).isEquals(); - } - - @Override - public final int hashCode() { - return new HashCodeBuilder(17, 37).append(uniqueId).toHashCode(); - } - - @Override - public final String toString() { - return new ToStringBuilder(this, ToStringStyle.NO_FIELD_NAMES_STYLE).append(uniqueId).toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java deleted file mode 100644 index a413ad2..0000000 --- a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.io.nio; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Calendar; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingDeque; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * - * @author none - */ -public class BufferPool implements Runnable { - - private static final Logger LOGGER = LoggerFactory.getLogger(BufferPool.class); - final BlockingQueue<ByteBuffer> bufferPool; - private final static double ONE_MB = 1 << 20; - private Calendar lastRateSampleTime = Calendar.getInstance(); - private final Calendar startTime = Calendar.getInstance(); - double lastRateSampleMBps = -1.0; - double overallMBps = -1.0; - private long totalBytesExtracted = 0L; - private long lastTotalBytesExtracted = 0L; - final double maxRateMBps; - - public BufferPool(final int bufferCount, final int bufferCapacity, final boolean allocateDirect, final double maxRateMBps) { - bufferPool = new LinkedBlockingDeque<>(BufferPool.createBuffers(bufferCount, bufferCapacity, allocateDirect)); - this.maxRateMBps = maxRateMBps; - } - - /** - * Returns the given buffer to the pool - and clears it. - * - * @param buffer - * @param bytesProcessed - * @return - */ - public synchronized boolean returnBuffer(ByteBuffer buffer, final int bytesProcessed) { - totalBytesExtracted += bytesProcessed; - buffer.clear(); - return bufferPool.add(buffer); - } - - //here we enforce the desired rate we want by restricting access to buffers when we're over rate - public synchronized ByteBuffer poll() { - computeRate(); - final double weightedAvg = (lastRateSampleMBps * 0.7) + (overallMBps * 0.3); - if (overallMBps >= maxRateMBps || weightedAvg >= maxRateMBps) { - return null; - } - return bufferPool.poll(); - } - - public int size() { - return bufferPool.size(); - } - - private synchronized void computeRate() { - final Calendar now = Calendar.getInstance(); - final long measurementDurationMillis = now.getTimeInMillis() - lastRateSampleTime.getTimeInMillis(); - final double duractionSecs = ((double) measurementDurationMillis) / 1000.0; - if (duractionSecs >= 0.75) { //recompute every 3/4 second or when we're too fast - final long totalDuractionMillis = now.getTimeInMillis() - startTime.getTimeInMillis(); - final double totalDurationSecs = ((double) totalDuractionMillis) / 1000.0; - final long differenceBytes = totalBytesExtracted - lastTotalBytesExtracted; - lastTotalBytesExtracted = totalBytesExtracted; - lastRateSampleTime = now; - final double bps = ((double) differenceBytes) / duractionSecs; - final double totalBps = ((double) totalBytesExtracted / totalDurationSecs); - lastRateSampleMBps = bps / ONE_MB; - overallMBps = totalBps / ONE_MB; - } - } - - public static List<ByteBuffer> createBuffers(final int bufferCount, final int bufferCapacity, final boolean allocateDirect) { - final List<ByteBuffer> buffers = new ArrayList<>(); - for (int i = 0; i < bufferCount; i++) { - final ByteBuffer buffer = (allocateDirect) ? ByteBuffer.allocateDirect(bufferCapacity) : ByteBuffer.allocate(bufferCapacity); - buffers.add(buffer); - } - return buffers; - } - - private void logChannelReadRates() { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(String.format("Overall rate= %,.4f MB/s / Current Rate= %,.4f MB/s / Total Bytes Read= %d", overallMBps, lastRateSampleMBps, totalBytesExtracted)); - } - } - - @Override - public void run() { - computeRate(); - logChannelReadRates(); - } -}