NIFI-2574 Changed NiFiProperties to avoid static initializer and updated all references to it.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7d7401ad Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7d7401ad Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7d7401ad Branch: refs/heads/master Commit: 7d7401add4699cd7d543ce18d11d7c9d6ab6e20c Parents: b3f3648 Author: joewitt <joew...@apache.org> Authored: Mon Aug 15 20:18:47 2016 -0700 Committer: joewitt <joew...@apache.org> Committed: Wed Aug 17 00:10:07 2016 -0700 ---------------------------------------------------------------------- .../ControllerServiceInitializationContext.java | 3 +- .../apache/nifi/kerberos/KerberosContext.java | 51 ++++ .../ProcessorInitializationContext.java | 11 +- .../ReportingInitializationContext.java | 3 +- nifi-commons/nifi-hadoop-utils/pom.xml | 4 - .../apache/nifi/hadoop/KerberosProperties.java | 36 +-- .../nifi/hadoop/TestKerberosProperties.java | 12 +- .../org/apache/nifi/util/NiFiProperties.java | 236 +++++++++-------- .../apache/nifi/util/NiFiPropertiesTest.java | 52 +--- ...kControllerServiceInitializationContext.java | 16 ++ .../MockProcessorInitializationContext.java | 16 ++ .../MockReportingInitializationContext.java | 16 ++ ...kControllerServiceInitializationContext.java | 15 ++ .../MockProcessorInitializationContext.java | 16 ++ .../MockReportingInitializationContext.java | 16 ++ .../nifi/documentation/DocGeneratorTest.java | 35 ++- .../nifi/authorization/FileAuthorizerTest.java | 2 +- .../authorization/util/IdentityMappingUtil.java | 2 +- ...andardClusterCoordinationProtocolSender.java | 8 +- .../heartbeat/AbstractHeartbeatMonitor.java | 39 ++- .../ClusterProtocolHeartbeatMonitor.java | 40 ++- .../http/StandardHttpResponseMerger.java | 34 ++- .../endpoints/StatusHistoryEndpointMerger.java | 18 +- .../ThreadPoolRequestReplicator.java | 12 +- .../node/CuratorNodeProtocolSender.java | 18 +- .../node/NodeClusterCoordinator.java | 100 +++---- .../ThreadPoolRequestReplicatorFactoryBean.java | 18 +- .../heartbeat/TestAbstractHeartbeatMonitor.java | 14 +- .../http/StandardHttpResponseMergerSpec.groovy | 7 +- .../StatusHistoryEndpointMergerSpec.groovy | 5 +- .../TestThreadPoolRequestReplicator.java | 32 +-- .../node/TestNodeClusterCoordinator.java | 24 +- .../nifi/cluster/integration/Cluster.java | 10 +- .../apache/nifi/cluster/integration/Node.java | 27 +- .../nifi/remote/protocol/ServerProtocol.java | 12 +- .../nifi/connectable/StandardConnection.java | 19 +- .../nifi/controller/FileSystemSwapManager.java | 53 ++-- .../apache/nifi/controller/FlowController.java | 80 +++--- .../nifi/controller/StandardFlowService.java | 103 ++++---- .../controller/StandardFlowSynchronizer.java | 61 ++--- .../nifi/controller/StandardProcessorNode.java | 10 +- .../cluster/ClusterProtocolHeartbeater.java | 20 +- .../cluster/ZooKeeperClientConfig.java | 15 +- .../election/CuratorLeaderElectionManager.java | 31 +-- .../StandardReportingInitializationContext.java | 25 +- .../repository/FileSystemRepository.java | 86 +++--- .../repository/VolatileContentRepository.java | 44 ++-- .../WriteAheadFlowFileRepository.java | 50 ++-- .../scheduling/StandardProcessScheduler.java | 29 ++- .../scheduling/TimerDrivenSchedulingAgent.java | 11 +- ...dControllerServiceInitializationContext.java | 24 +- .../StandardControllerServiceProvider.java | 7 +- .../VolatileComponentStatusRepository.java | 11 +- .../apache/nifi/encrypt/StringEncryptor.java | 21 +- .../StandardXMLFlowConfigurationDAO.java | 8 +- .../StandardProcessorInitializationContext.java | 24 +- .../nifi/remote/StandardRemoteProcessGroup.java | 78 +++--- .../StandardFlowSynchronizerSpec.groovy | 8 +- .../controller/StandardFlowServiceTest.java | 2 +- .../controller/TestFileSystemSwapManager.java | 10 +- .../nifi/controller/TestFlowController.java | 26 +- .../controller/TestStandardProcessorNode.java | 5 +- .../repository/TestFileSystemRepository.java | 101 +++---- .../repository/TestStandardProcessSession.java | 260 +++++++++---------- .../TestVolatileContentRepository.java | 33 +-- .../TestWriteAheadFlowFileRepository.java | 30 ++- .../scheduling/TestProcessorLifecycle.java | 73 ++++-- .../TestStandardProcessScheduler.java | 64 +++-- .../StandardControllerServiceProviderTest.java | 14 +- .../TestStandardControllerServiceProvider.java | 40 +-- .../zookeeper/TestZooKeeperStateProvider.java | 17 +- .../tasks/TestContinuallyRunProcessorTask.java | 5 +- .../nifi/nar/NarThreadContextClassLoader.java | 31 ++- .../org/apache/nifi/nar/NarUnpackerTest.java | 58 ++--- .../src/main/java/org/apache/nifi/NiFi.java | 2 +- .../security/util/SslServerSocketFactory.java | 4 +- .../security/util/SslSocketFactory.java | 4 +- .../nifi/remote/HttpRemoteSiteListener.java | 30 +-- .../nifi/remote/RemoteResourceFactory.java | 4 +- .../nifi/remote/SocketRemoteSiteListener.java | 19 +- .../nifi/remote/StandardRemoteGroupPort.java | 52 ++-- .../AbstractFlowFileServerProtocol.java | 9 +- .../remote/protocol/FlowFileTransaction.java | 2 +- .../remote/protocol/HandshakeProperties.java | 1 - .../StandardHttpFlowFileServerProtocol.java | 19 +- .../socket/SocketFlowFileServerProtocol.java | 25 +- .../nifi/remote/TestHttpRemoteSiteListener.java | 11 +- .../remote/TestStandardRemoteGroupPort.java | 10 +- .../io/socket/TestSocketChannelStreams.java | 2 +- .../io/socket/ssl/TestSSLSocketChannel.java | 12 +- .../http/TestHttpFlowFileServerProtocol.java | 10 +- .../apache/nifi/web/server/JettyServerTest.java | 33 +-- .../nifi/web/api/DataTransferResource.java | 130 +++++----- .../apache/nifi/web/api/SiteToSiteResource.java | 43 ++- .../accesscontrol/AccessControlHelper.java | 10 +- .../accesscontrol/ITAccessTokenEndpoint.java | 5 +- .../nifi/integration/util/NiFiTestServer.java | 19 +- .../nifi/web/api/TestDataTransferResource.java | 18 +- .../nifi/web/api/TestSiteToSiteResource.java | 4 +- .../OcspCertificateValidatorGroovyTest.groovy | 12 +- .../NiFiAuthenticationProviderTest.java | 2 +- .../hadoop/AbstractHadoopProcessor.java | 9 +- .../processors/hadoop/AbstractHadoopTest.java | 5 +- .../nifi/processors/hadoop/GetHDFSTest.java | 2 +- .../nifi/processors/hadoop/PutHDFSTest.java | 2 +- .../hadoop/TestCreateHadoopSequenceFile.java | 2 +- .../nifi/processors/hadoop/TestFetchHDFS.java | 2 +- .../nifi/processors/hadoop/TestListHDFS.java | 2 +- .../hadoop/inotify/TestGetHDFSEvents.java | 2 +- .../nifi/dbcp/hive/HiveConnectionPool.java | 29 ++- .../nifi/processors/hive/PutHiveStreaming.java | 6 +- .../processors/hive/TestPutHiveStreaming.java | 16 +- .../PersistentProvenanceRepository.java | 218 +++++++++------- .../VolatileProvenanceRepository.java | 45 ++-- .../TestVolatileProvenanceRepository.java | 7 +- .../script/InvokeScriptedProcessor.java | 115 +++++--- .../nifi/controller/MonitorDiskUsage.java | 62 +++-- .../nifi/controller/MonitorMemoryTest.java | 25 +- .../nifi/hbase/HBase_1_1_2_ClientService.java | 10 +- .../hbase/TestHBase_1_1_2_ClientService.java | 9 +- 120 files changed, 2034 insertions(+), 1573 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceInitializationContext.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceInitializationContext.java b/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceInitializationContext.java index 3486621..f6ac9e7 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceInitializationContext.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceInitializationContext.java @@ -17,9 +17,10 @@ package org.apache.nifi.controller; import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.kerberos.KerberosContext; import org.apache.nifi.logging.ComponentLog; -public interface ControllerServiceInitializationContext { +public interface ControllerServiceInitializationContext extends KerberosContext { /** * @return the identifier associated with the {@link ControllerService} with http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-api/src/main/java/org/apache/nifi/kerberos/KerberosContext.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/kerberos/KerberosContext.java b/nifi-api/src/main/java/org/apache/nifi/kerberos/KerberosContext.java new file mode 100644 index 0000000..8f7ba19 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/kerberos/KerberosContext.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kerberos; + +import java.io.File; + +public interface KerberosContext { + + /** + * The Kerberos service principal used by NiFi to communicate with the KDC + * in order to obtain tickets on behalf of NiFi. Typically of the form + * NIFI/fully.qualified.domain@REALM. + * + * @return the principal, or null if this NiFi instance is not configured + * with a NiFi Kerberos service principal + */ + public String getKerberosServicePrincipal(); + + /** + * The File instance for the Kerberos service keytab. The service principal + * and service keytab will be used to communicate with the KDC to obtain + * tickets on behalf of NiFi. + * + * @return the File instance of the service keytab, or null if this NiFi + * instance is not configured with a NiFi Kerberos service keytab + */ + public File getKerberosServiceKeytab(); + + /** + * The Kerberos configuration file (typically krb5.conf) that will be used + * by this JVM during all Kerberos operations. + * + * @return the File instance for the Kerberos configuration file, or null if + * this NiFi instance is not configured with a Kerberos configuration file + */ + public File getKerberosConfigurationFile(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-api/src/main/java/org/apache/nifi/processor/ProcessorInitializationContext.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessorInitializationContext.java b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessorInitializationContext.java index 726b3fa..df18193 100644 --- a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessorInitializationContext.java +++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessorInitializationContext.java @@ -18,16 +18,17 @@ package org.apache.nifi.processor; import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.controller.NodeTypeProvider; +import org.apache.nifi.kerberos.KerberosContext; import org.apache.nifi.logging.ComponentLog; /** * <p> * The <code>ProcessorInitializationContext</code> provides - * {@link org.apache.nifi.processor.Processor Processor}s access to objects that may be of - * use throughout the life of the Processor. + * {@link org.apache.nifi.processor.Processor Processor}s access to objects that + * may be of use throughout the life of the Processor. * </p> */ -public interface ProcessorInitializationContext { +public interface ProcessorInitializationContext extends KerberosContext { /** * @return the unique identifier for this processor @@ -47,7 +48,9 @@ public interface ProcessorInitializationContext { ControllerServiceLookup getControllerServiceLookup(); /** - * @return the {@link NodeTypeProvider} which can be used to detect the node type of this NiFi instance. + * @return the {@link NodeTypeProvider} which can be used to detect the node + * type of this NiFi instance. */ NodeTypeProvider getNodeTypeProvider(); + } http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingInitializationContext.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingInitializationContext.java b/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingInitializationContext.java index d014b26..df64e03 100644 --- a/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingInitializationContext.java +++ b/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingInitializationContext.java @@ -19,6 +19,7 @@ package org.apache.nifi.reporting; import java.util.concurrent.TimeUnit; import org.apache.nifi.controller.ControllerServiceLookup; +import org.apache.nifi.kerberos.KerberosContext; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.scheduling.SchedulingStrategy; @@ -26,7 +27,7 @@ import org.apache.nifi.scheduling.SchedulingStrategy; * A ReportingConfiguration provides configuration information to a * ReportingTask at the time of initialization */ -public interface ReportingInitializationContext { +public interface ReportingInitializationContext extends KerberosContext { /** * @return the identifier for this ReportingTask http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-commons/nifi-hadoop-utils/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-hadoop-utils/pom.xml b/nifi-commons/nifi-hadoop-utils/pom.xml index 5f18400..8de849e 100644 --- a/nifi-commons/nifi-hadoop-utils/pom.xml +++ b/nifi-commons/nifi-hadoop-utils/pom.xml @@ -35,10 +35,6 @@ <artifactId>nifi-processor-utils</artifactId> </dependency> <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-properties</artifactId> - </dependency> - <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java b/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java index 5e8fb7d..c7743f4 100644 --- a/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java +++ b/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java @@ -23,20 +23,20 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.util.StringUtils; import java.io.File; import java.util.ArrayList; import java.util.List; /** - * All processors and controller services that need properties for Kerberos Principal and Keytab - * should obtain them through this class by calling: + * All processors and controller services that need properties for Kerberos + * Principal and Keytab should obtain them through this class by calling: * - * KerberosProperties props = KerberosProperties.create(NiFiProperties.getInstance()) + * KerberosProperties props = + * KerberosProperties.create(NiFiProperties.getInstance()) * - * The properties can be accessed from the resulting KerberosProperties instance. + * The properties can be accessed from the resulting KerberosProperties + * instance. */ public class KerberosProperties { @@ -45,7 +45,14 @@ public class KerberosProperties { private final PropertyDescriptor kerberosPrincipal; private final PropertyDescriptor kerberosKeytab; - private KerberosProperties(final File kerberosConfigFile) { + /** + * Instantiate a KerberosProperties object but keep in mind it is + * effectively a singleton because the krb5.conf file needs to be set as a + * system property which this constructor will take care of. + * + * @param kerberosConfigFile file of krb5.conf + */ + public KerberosProperties(final File kerberosConfigFile) { this.kerberosConfigFile = kerberosConfigFile; if (this.kerberosConfigFile != null) { @@ -91,13 +98,6 @@ public class KerberosProperties { .build(); } - public static KerberosProperties create(final NiFiProperties niFiProperties) { - if (niFiProperties == null) { - throw new IllegalArgumentException("NiFiProperties can not be null"); - } - return new KerberosProperties(niFiProperties.getKerberosConfigurationFile()); - } - public File getKerberosConfigFile() { return kerberosConfigFile; } @@ -120,7 +120,8 @@ public class KerberosProperties { // if security is enabled then the keytab and principal are required final boolean isSecurityEnabled = SecurityUtil.isSecurityEnabled(config); - if (isSecurityEnabled && StringUtils.isBlank(principal)) { + final boolean blankPrincipal = (principal == null || principal.isEmpty()); + if (isSecurityEnabled && blankPrincipal) { results.add(new ValidationResult.Builder() .valid(false) .subject(subject) @@ -128,7 +129,8 @@ public class KerberosProperties { .build()); } - if (isSecurityEnabled && StringUtils.isBlank(keytab)) { + final boolean blankKeytab = (keytab == null || keytab.isEmpty()); + if (isSecurityEnabled && blankKeytab) { results.add(new ValidationResult.Builder() .valid(false) .subject(subject) @@ -136,7 +138,7 @@ public class KerberosProperties { .build()); } - if (!isSecurityEnabled && (!StringUtils.isBlank(principal) || !StringUtils.isBlank(keytab))) { + if (!isSecurityEnabled && (!blankPrincipal || !blankKeytab)) { logger.warn("Configuration does not have security enabled, Keytab and Principal will be ignored"); } http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-commons/nifi-hadoop-utils/src/test/java/org/apache/nifi/hadoop/TestKerberosProperties.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-hadoop-utils/src/test/java/org/apache/nifi/hadoop/TestKerberosProperties.java b/nifi-commons/nifi-hadoop-utils/src/test/java/org/apache/nifi/hadoop/TestKerberosProperties.java index 131fe65..8cd1ea1 100644 --- a/nifi-commons/nifi-hadoop-utils/src/test/java/org/apache/nifi/hadoop/TestKerberosProperties.java +++ b/nifi-commons/nifi-hadoop-utils/src/test/java/org/apache/nifi/hadoop/TestKerberosProperties.java @@ -19,7 +19,6 @@ package org.apache.nifi.hadoop; import org.apache.hadoop.conf.Configuration; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.util.NiFiProperties; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; @@ -27,17 +26,13 @@ import org.mockito.Mockito; import java.io.File; import java.util.List; -import static org.mockito.Mockito.when; - public class TestKerberosProperties { @Test public void testWithKerberosConfigFile() { final File file = new File("src/test/resources/krb5.conf"); - final NiFiProperties niFiProperties = Mockito.mock(NiFiProperties.class); - when(niFiProperties.getKerberosConfigurationFile()).thenReturn(file); - final KerberosProperties kerberosProperties = KerberosProperties.create(niFiProperties); + final KerberosProperties kerberosProperties = new KerberosProperties(file); Assert.assertNotNull(kerberosProperties); Assert.assertNotNull(kerberosProperties.getKerberosConfigFile()); @@ -51,10 +46,9 @@ public class TestKerberosProperties { @Test public void testWithoutKerberosConfigFile() { - final NiFiProperties niFiProperties = Mockito.mock(NiFiProperties.class); - when(niFiProperties.getKerberosConfigurationFile()).thenReturn(null); + final File file = new File("src/test/resources/krb5.conf"); - final KerberosProperties kerberosProperties = KerberosProperties.create(niFiProperties); + final KerberosProperties kerberosProperties = new KerberosProperties(null); Assert.assertNotNull(kerberosProperties); Assert.assertNull(kerberosProperties.getKerberosConfigFile()); http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java index bbb3998..d381f74 100644 --- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -16,9 +16,6 @@ */ package org.apache.nifi.util; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.BufferedInputStream; import java.io.File; import java.io.FileInputStream; @@ -29,17 +26,22 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; - -public class NiFiProperties extends Properties { - - private static final long serialVersionUID = 2119177359005492702L; - - private static final Logger LOG = LoggerFactory.getLogger(NiFiProperties.class); - private static NiFiProperties instance = null; +import java.util.Set; + +/** + * The NiFiProperties class holds all properties which are needed for various + * values to be available at runtime. It is strongly tied to the startup + * properties needed and is often refer to as the 'nifi.properties' file. The + * properties contains keys and values. Great care should be taken in leveraging + * this class or passing it along. It's use should be refactored and minimized + * over time. + */ +public abstract class NiFiProperties { // core properties public static final String PROPERTIES_FILE_PATH = "nifi.properties.file.path"; @@ -174,7 +176,6 @@ public class NiFiProperties extends Properties { public static final String ZOOKEEPER_SESSION_TIMEOUT = "nifi.zookeeper.session.timeout"; public static final String ZOOKEEPER_ROOT_NODE = "nifi.zookeeper.root.node"; - // kerberos properties public static final String KERBEROS_KRB5_FILE = "nifi.kerberos.krb5.file"; public static final String KERBEROS_SERVICE_PRINCIPAL = "nifi.kerberos.service.principal"; @@ -243,68 +244,20 @@ public class NiFiProperties extends Properties { // Kerberos defaults public static final String DEFAULT_KERBEROS_AUTHENTICATION_EXPIRATION = "12 hours"; - private NiFiProperties() { - super(); - } - - public NiFiProperties copy() { - final NiFiProperties copy = new NiFiProperties(); - copy.putAll(this); - return copy; - } + /** + * Retrieves the property value for the given property key + * + * @param key the key of property value to lookup. + * @return value of property at given key or null if not found + */ + public abstract String getProperty(String key); /** - * Factory method to create an instance of the {@link NiFiProperties}. This - * method employs a standard singleton pattern by caching the instance if it - * was already obtained + * Retrieves all known property keys. * - * @return instance of {@link NiFiProperties} + * @return all known property keys. */ - public static synchronized NiFiProperties getInstance() { - // NOTE: unit tests can set instance to null (with reflection) to effectively create a new singleton. - // changing the below as a check for whether the instance was initialized will break those - // unit tests. - if (null == instance) { - final NiFiProperties suspectInstance = new NiFiProperties(); - final String nfPropertiesFilePath = System - .getProperty(NiFiProperties.PROPERTIES_FILE_PATH); - if (null == nfPropertiesFilePath || nfPropertiesFilePath.trim().length() == 0) { - throw new RuntimeException("Requires a system property called \'" - + NiFiProperties.PROPERTIES_FILE_PATH - + "\' and this is not set or has no value"); - } - final File propertiesFile = new File(nfPropertiesFilePath); - if (!propertiesFile.exists()) { - throw new RuntimeException("Properties file doesn't exist \'" - + propertiesFile.getAbsolutePath() + "\'"); - } - if (!propertiesFile.canRead()) { - throw new RuntimeException("Properties file exists but cannot be read \'" - + propertiesFile.getAbsolutePath() + "\'"); - } - InputStream inStream = null; - try { - inStream = new BufferedInputStream(new FileInputStream(propertiesFile)); - suspectInstance.load(inStream); - } catch (final Exception ex) { - LOG.error("Cannot load properties file due to " + ex.getLocalizedMessage()); - throw new RuntimeException("Cannot load properties file due to " - + ex.getLocalizedMessage(), ex); - } finally { - if (null != inStream) { - try { - inStream.close(); - } catch (final Exception ex) { - /** - * do nothing * - */ - } - } - } - instance = suspectInstance; - } - return instance; - } + public abstract Set<String> getPropertyKeys(); // getters for core properties // public File getFlowConfigurationFile() { @@ -395,7 +348,8 @@ public class NiFiProperties extends Properties { } /** - * The host name that will be given out to clients to connect to the Remote Input Port. + * The host name that will be given out to clients to connect to the Remote + * Input Port. * * @return the remote input host name or null if not configured */ @@ -443,7 +397,9 @@ public class NiFiProperties extends Properties { /** * The HTTP or HTTPS Web API port for a Remote Input Port. - * @return the remote input port for HTTP(S) communication, or null if HTTP(S) Site-to-Site is not enabled + * + * @return the remote input port for HTTP(S) communication, or null if + * HTTP(S) Site-to-Site is not enabled */ public Integer getRemoteInputHttpPort() { if (!isSiteToSiteHttpEnabled()) { @@ -479,7 +435,8 @@ public class NiFiProperties extends Properties { } /** - * Returns whether the processors should be started automatically when the application loads. + * Returns whether the processors should be started automatically when the + * application loads. * * @return Whether to auto start the processors or not */ @@ -490,7 +447,8 @@ public class NiFiProperties extends Properties { } /** - * Returns the number of partitions that should be used for the FlowFile Repository + * Returns the number of partitions that should be used for the FlowFile + * Repository * * @return the number of partitions */ @@ -501,7 +459,8 @@ public class NiFiProperties extends Properties { } /** - * Returns the number of milliseconds between FlowFileRepository checkpointing + * Returns the number of milliseconds between FlowFileRepository + * checkpointing * * @return the number of milliseconds between checkpoint events */ @@ -608,7 +567,7 @@ public class NiFiProperties extends Properties { List<Path> narLibraryPaths = new ArrayList<>(); // go through each property - for (String propertyName : stringPropertyNames()) { + for (String propertyName : getPropertyKeys()) { // determine if the property is a nar library path if (StringUtils.startsWith(propertyName, NAR_LIBRARY_DIRECTORY_PREFIX) || NAR_LIBRARY_DIRECTORY.equals(propertyName)) { @@ -684,7 +643,6 @@ public class NiFiProperties extends Properties { return file; } - // getters for cluster node properties // public boolean isNode() { return Boolean.parseBoolean(getProperty(CLUSTER_IS_NODE)); @@ -719,7 +677,6 @@ public class NiFiProperties extends Properties { } } - public boolean isClustered() { return Boolean.parseBoolean(getProperty(CLUSTER_IS_NODE)); } @@ -779,7 +736,8 @@ public class NiFiProperties extends Properties { } /** - * Returns true if the Kerberos service principal and keytab location properties are populated. + * Returns true if the Kerberos service principal and keytab location + * properties are populated. * * @return true if Kerberos service support is enabled */ @@ -788,12 +746,14 @@ public class NiFiProperties extends Properties { } /** - * Returns true if client certificates are required for REST API. Determined if the following conditions are all true: + * Returns true if client certificates are required for REST API. Determined + * if the following conditions are all true: * - * - login identity provider is not populated - * - Kerberos service support is not enabled + * - login identity provider is not populated - Kerberos service support is + * not enabled * - * @return true if client certificates are required for access to the REST API + * @return true if client certificates are required for access to the REST + * API */ public boolean isClientAuthRequiredForRestApi() { return StringUtils.isBlank(getProperty(NiFiProperties.SECURITY_USER_LOGIN_IDENTITY_PROVIDER)) && !isKerberosServiceSupportEnabled(); @@ -839,7 +799,8 @@ public class NiFiProperties extends Properties { } /** - * Returns the database repository path. It simply returns the value configured. No directories will be created as a result of this operation. + * Returns the database repository path. It simply returns the value + * configured. No directories will be created as a result of this operation. * * @return database repository path * @throws InvalidPathException If the configured path is invalid @@ -849,7 +810,8 @@ public class NiFiProperties extends Properties { } /** - * Returns the flow file repository path. It simply returns the value configured. No directories will be created as a result of this operation. + * Returns the flow file repository path. It simply returns the value + * configured. No directories will be created as a result of this operation. * * @return database repository path * @throws InvalidPathException If the configured path is invalid @@ -859,8 +821,10 @@ public class NiFiProperties extends Properties { } /** - * Returns the content repository paths. This method returns a mapping of file repository name to file repository paths. It simply returns the values configured. No directories will be created as - * a result of this operation. + * Returns the content repository paths. This method returns a mapping of + * file repository name to file repository paths. It simply returns the + * values configured. No directories will be created as a result of this + * operation. * * @return file repositories paths * @throws InvalidPathException If any of the configured paths are invalid @@ -869,7 +833,7 @@ public class NiFiProperties extends Properties { final Map<String, Path> contentRepositoryPaths = new HashMap<>(); // go through each property - for (String propertyName : stringPropertyNames()) { + for (String propertyName : getPropertyKeys()) { // determine if the property is a file repository path if (StringUtils.startsWith(propertyName, REPOSITORY_CONTENT_PREFIX)) { // get the repository key @@ -884,8 +848,10 @@ public class NiFiProperties extends Properties { } /** - * Returns the provenance repository paths. This method returns a mapping of file repository name to file repository paths. It simply returns the values configured. No directories will be created - * as a result of this operation. + * Returns the provenance repository paths. This method returns a mapping of + * file repository name to file repository paths. It simply returns the + * values configured. No directories will be created as a result of this + * operation. * * @return the name and paths of all provenance repository locations */ @@ -893,7 +859,7 @@ public class NiFiProperties extends Properties { final Map<String, Path> provenanceRepositoryPaths = new HashMap<>(); // go through each property - for (String propertyName : stringPropertyNames()) { + for (String propertyName : getPropertyKeys()) { // determine if the property is a file repository path if (StringUtils.startsWith(propertyName, PROVENANCE_REPO_DIRECTORY_PREFIX)) { // get the repository key @@ -919,17 +885,9 @@ public class NiFiProperties extends Properties { return getProperty(MAX_APPENDABLE_CLAIM_SIZE); } - @Override public String getProperty(final String key, final String defaultValue) { - final String value = super.getProperty(key, defaultValue); - if (value == null) { - return null; - } - - if (value.trim().isEmpty()) { - return defaultValue; - } - return value; + final String value = getProperty(key); + return (value == null || value.trim().isEmpty()) ? defaultValue : value; } public String getBoredYieldDuration() { @@ -973,7 +931,7 @@ public class NiFiProperties extends Properties { return getProperty(FLOW_CONFIGURATION_ARCHIVE_MAX_STORAGE, DEFAULT_FLOW_CONFIGURATION_ARCHIVE_MAX_STORAGE); } - public String getVariableRegistryProperties(){ + public String getVariableRegistryProperties() { return getProperty(VARIABLE_REGISTRY_PROPERTIES); } @@ -981,18 +939,88 @@ public class NiFiProperties extends Properties { final List<Path> vrPropertiesPaths = new ArrayList<>(); final String vrPropertiesFiles = getVariableRegistryProperties(); - if(!StringUtils.isEmpty(vrPropertiesFiles)) { + if (!StringUtils.isEmpty(vrPropertiesFiles)) { final List<String> vrPropertiesFileList = Arrays.asList(vrPropertiesFiles.split(",")); - for(String propertiesFile : vrPropertiesFileList){ + for (String propertiesFile : vrPropertiesFileList) { vrPropertiesPaths.add(Paths.get(propertiesFile)); } - return vrPropertiesPaths.toArray( new Path[vrPropertiesPaths.size()]); + return vrPropertiesPaths.toArray(new Path[vrPropertiesPaths.size()]); } else { return new Path[]{}; } } + public int size() { + return getPropertyKeys().size(); + } + + /** + * Creates an instance of NiFiProperties. This should likely not be called + * by any classes outside of the NiFi framework but can be useful by the + * framework for default property loading behavior or helpful in tests + * needing to create specific instances of NiFiProperties. If properties + * file specified cannot be found/read a runtime exception will be thrown. + * If one is not specified no properties will be loaded by default. + * + * @param propertiesFilePath if provided properties will be loaded from + * given file; else will be loaded from System property. Can be null. + * @param additionalProperties allows overriding of properties with the + * supplied values. these will be applied after loading from any properties + * file. Can be null or empty. + * @return NiFiProperties + */ + public static NiFiProperties createBasicNiFiProperties(final String propertiesFilePath, final Map<String, String> additionalProperties) { + final Map<String, String> addProps = (additionalProperties == null) ? Collections.EMPTY_MAP : additionalProperties; + final Properties properties = new Properties(); + final String nfPropertiesFilePath = (propertiesFilePath == null) + ? System.getProperty(NiFiProperties.PROPERTIES_FILE_PATH) + : propertiesFilePath; + if (nfPropertiesFilePath != null) { + final File propertiesFile = new File(nfPropertiesFilePath.trim()); + if (!propertiesFile.exists()) { + throw new RuntimeException("Properties file doesn't exist \'" + + propertiesFile.getAbsolutePath() + "\'"); + } + if (!propertiesFile.canRead()) { + throw new RuntimeException("Properties file exists but cannot be read \'" + + propertiesFile.getAbsolutePath() + "\'"); + } + InputStream inStream = null; + try { + inStream = new BufferedInputStream(new FileInputStream(propertiesFile)); + properties.load(inStream); + } catch (final Exception ex) { + throw new RuntimeException("Cannot load properties file due to " + + ex.getLocalizedMessage(), ex); + } finally { + if (null != inStream) { + try { + inStream.close(); + } catch (final Exception ex) { + /** + * do nothing * + */ + } + } + } + } + addProps.entrySet().stream().forEach((entry) -> { + properties.setProperty(entry.getKey(), entry.getValue()); + }); + return new NiFiProperties() { + @Override + public String getProperty(String key) { + return properties.getProperty(key); + } + + @Override + public Set<String> getPropertyKeys() { + return properties.stringPropertyNames(); + } + }; + } + } http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-commons/nifi-properties/src/test/java/org/apache/nifi/util/NiFiPropertiesTest.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-properties/src/test/java/org/apache/nifi/util/NiFiPropertiesTest.java b/nifi-commons/nifi-properties/src/test/java/org/apache/nifi/util/NiFiPropertiesTest.java index 96a618e..3547b92 100644 --- a/nifi-commons/nifi-properties/src/test/java/org/apache/nifi/util/NiFiPropertiesTest.java +++ b/nifi-commons/nifi-properties/src/test/java/org/apache/nifi/util/NiFiPropertiesTest.java @@ -19,10 +19,7 @@ package org.apache.nifi.util; import org.junit.Assert; import org.junit.Test; -import java.io.BufferedInputStream; import java.io.File; -import java.io.FileInputStream; -import java.io.InputStream; import java.net.URISyntaxException; import java.nio.file.Path; import java.util.HashSet; @@ -36,7 +33,7 @@ public class NiFiPropertiesTest { @Test public void testProperties() { - NiFiProperties properties = loadSpecifiedProperties("/NiFiProperties/conf/nifi.properties"); + NiFiProperties properties = loadNiFiProperties("/NiFiProperties/conf/nifi.properties"); assertEquals("UI Banner Text", properties.getBannerText()); @@ -58,7 +55,7 @@ public class NiFiPropertiesTest { @Test public void testMissingProperties() { - NiFiProperties properties = loadSpecifiedProperties("/NiFiProperties/conf/nifi.missing.properties"); + NiFiProperties properties = loadNiFiProperties("/NiFiProperties/conf/nifi.missing.properties"); List<Path> directories = properties.getNarLibraryDirectories(); @@ -72,7 +69,7 @@ public class NiFiPropertiesTest { @Test public void testBlankProperties() { - NiFiProperties properties = loadSpecifiedProperties("/NiFiProperties/conf/nifi.blank.properties"); + NiFiProperties properties = loadNiFiProperties("/NiFiProperties/conf/nifi.blank.properties"); List<Path> directories = properties.getNarLibraryDirectories(); @@ -83,45 +80,14 @@ public class NiFiPropertiesTest { } - private NiFiProperties loadSpecifiedProperties(String propertiesFile) { - - String filePath; + private NiFiProperties loadNiFiProperties(final String propsPath) { + String realPath = null; try { - filePath = NiFiPropertiesTest.class.getResource(propertiesFile).toURI().getPath(); - } catch (URISyntaxException ex) { - throw new RuntimeException("Cannot load properties file due to " - + ex.getLocalizedMessage(), ex); - } - - System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, filePath); - - NiFiProperties properties = NiFiProperties.getInstance(); - - // clear out existing properties - for (String prop : properties.stringPropertyNames()) { - properties.remove(prop); + realPath = NiFiPropertiesTest.class.getResource(propsPath).toURI().getPath(); + } catch (final URISyntaxException ex) { + throw new RuntimeException(ex); } - - InputStream inStream = null; - try { - inStream = new BufferedInputStream(new FileInputStream(filePath)); - properties.load(inStream); - } catch (final Exception ex) { - throw new RuntimeException("Cannot load properties file due to " - + ex.getLocalizedMessage(), ex); - } finally { - if (null != inStream) { - try { - inStream.close(); - } catch (final Exception ex) { - /** - * do nothing * - */ - } - } - } - - return properties; + return NiFiProperties.createBasicNiFiProperties(realPath, null); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java index f8f4f59..ffa3546 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.util; +import java.io.File; import org.apache.nifi.components.state.StateManager; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerServiceInitializationContext; @@ -68,4 +69,19 @@ public class MockControllerServiceInitializationContext extends MockControllerSe public StateManager getStateManager() { return stateManager; } + + @Override + public String getKerberosServicePrincipal() { + return null; //this needs to be wired in. + } + + @Override + public File getKerberosServiceKeytab() { + return null; //this needs to be wired in. + } + + @Override + public File getKerberosConfigurationFile() { + return null; //this needs to be wired in. + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java index 132869c..e44e731 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.util; +import java.io.File; import java.util.Set; import java.util.UUID; @@ -86,4 +87,19 @@ public class MockProcessorInitializationContext implements ProcessorInitializati public NodeTypeProvider getNodeTypeProvider() { return context; } + + @Override + public String getKerberosServicePrincipal() { + return null; //this needs to be wired in. + } + + @Override + public File getKerberosServiceKeytab() { + return null; //this needs to be wired in. + } + + @Override + public File getKerberosConfigurationFile() { + return null; //this needs to be wired in. + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java index 0aea00a..454b742 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.util; +import java.io.File; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -86,4 +87,19 @@ public class MockReportingInitializationContext extends MockControllerServiceLoo public ComponentLog getLogger() { return logger; } + + @Override + public String getKerberosServicePrincipal() { + return null; //this needs to be wired in. + } + + @Override + public File getKerberosServiceKeytab() { + return null; //this needs to be wired in. + } + + @Override + public File getKerberosConfigurationFile() { + return null; //this needs to be wired in. + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java index abbde61..195934b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.documentation.mock; +import java.io.File; import org.apache.nifi.components.state.StateManager; import org.apache.nifi.controller.ControllerServiceInitializationContext; import org.apache.nifi.controller.ControllerServiceLookup; @@ -49,4 +50,18 @@ public class MockControllerServiceInitializationContext implements ControllerSer return null; } + @Override + public String getKerberosServicePrincipal() { + return null; + } + + @Override + public File getKerberosServiceKeytab() { + return null; + } + + @Override + public File getKerberosConfigurationFile() { + return null; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessorInitializationContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessorInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessorInitializationContext.java index 3935124..d86ea4c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessorInitializationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessorInitializationContext.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.documentation.mock; +import java.io.File; import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.controller.NodeTypeProvider; import org.apache.nifi.logging.ComponentLog; @@ -48,4 +49,19 @@ public class MockProcessorInitializationContext implements ProcessorInitializati public NodeTypeProvider getNodeTypeProvider() { return new MockNodeTypeProvider(); } + + @Override + public String getKerberosServicePrincipal() { + return null; + } + + @Override + public File getKerberosServiceKeytab() { + return null; + } + + @Override + public File getKerberosConfigurationFile() { + return null; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockReportingInitializationContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockReportingInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockReportingInitializationContext.java index ebf59d6..49d7933 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockReportingInitializationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockReportingInitializationContext.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.documentation.mock; +import java.io.File; import java.util.concurrent.TimeUnit; import org.apache.nifi.controller.ControllerServiceLookup; @@ -64,4 +65,19 @@ public class MockReportingInitializationContext implements ReportingInitializati public ComponentLog getLogger() { return new MockComponentLogger(); } + + @Override + public String getKerberosServicePrincipal() { + return null; + } + + @Override + public File getKerberosServiceKeytab() { + return null; + } + + @Override + public File getKerberosConfigurationFile() { + return null; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/DocGeneratorTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/DocGeneratorTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/DocGeneratorTest.java index afc3d50..b7c4db9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/DocGeneratorTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/DocGeneratorTest.java @@ -21,6 +21,8 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.util.Properties; +import java.util.Set; import org.apache.commons.io.FileUtils; import org.apache.nifi.nar.ExtensionManager; @@ -38,8 +40,9 @@ public class DocGeneratorTest { TemporaryFolder temporaryFolder = new TemporaryFolder(); temporaryFolder.create(); - NiFiProperties properties = loadSpecifiedProperties("/conf/nifi.properties"); - properties.setProperty(NiFiProperties.COMPONENT_DOCS_DIRECTORY, temporaryFolder.getRoot().getAbsolutePath()); + NiFiProperties properties = loadSpecifiedProperties("/conf/nifi.properties", + NiFiProperties.COMPONENT_DOCS_DIRECTORY, + temporaryFolder.getRoot().getAbsolutePath()); NarUnpacker.unpackNars(properties); @@ -60,22 +63,16 @@ public class DocGeneratorTest { Assert.assertTrue(generatedHtml.contains("resources")); } - private NiFiProperties loadSpecifiedProperties(String propertiesFile) { + private NiFiProperties loadSpecifiedProperties(final String propertiesFile, final String key, final String value) { String file = DocGeneratorTest.class.getResource(propertiesFile).getFile(); System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, file); - NiFiProperties properties = NiFiProperties.getInstance(); - - // clear out existing properties - for (String prop : properties.stringPropertyNames()) { - properties.remove(prop); - } - + final Properties props = new Properties(); InputStream inStream = null; try { inStream = new BufferedInputStream(new FileInputStream(file)); - properties.load(inStream); + props.load(inStream); } catch (final Exception ex) { throw new RuntimeException("Cannot load properties file due to " + ex.getLocalizedMessage(), ex); @@ -91,6 +88,20 @@ public class DocGeneratorTest { } } - return properties; + if (key != null && value != null) { + props.setProperty(key, value); + } + + return new NiFiProperties() { + @Override + public String getProperty(String key) { + return props.getProperty(key); + } + + @Override + public Set<String> getPropertyKeys() { + return props.stringPropertyNames(); + } + }; } } http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/test/java/org/apache/nifi/authorization/FileAuthorizerTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/test/java/org/apache/nifi/authorization/FileAuthorizerTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/test/java/org/apache/nifi/authorization/FileAuthorizerTest.java index 565e55a..fcedfc8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/test/java/org/apache/nifi/authorization/FileAuthorizerTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/test/java/org/apache/nifi/authorization/FileAuthorizerTest.java @@ -1468,7 +1468,7 @@ public class FileAuthorizerTest { private NiFiProperties getNiFiProperties(final Properties properties) { final NiFiProperties nifiProperties = Mockito.mock(NiFiProperties.class); - when(nifiProperties.stringPropertyNames()).thenReturn(properties.stringPropertyNames()); + when(nifiProperties.getPropertyKeys()).thenReturn(properties.stringPropertyNames()); when(nifiProperties.getProperty(anyString())).then(new Answer<String>() { @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-authorization/src/main/java/org/apache/nifi/authorization/util/IdentityMappingUtil.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-authorization/src/main/java/org/apache/nifi/authorization/util/IdentityMappingUtil.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-authorization/src/main/java/org/apache/nifi/authorization/util/IdentityMappingUtil.java index 2485db5..f7087ac 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-authorization/src/main/java/org/apache/nifi/authorization/util/IdentityMappingUtil.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-authorization/src/main/java/org/apache/nifi/authorization/util/IdentityMappingUtil.java @@ -43,7 +43,7 @@ public class IdentityMappingUtil { final List<IdentityMapping> mappings = new ArrayList<>(); // go through each property - for (String propertyName : properties.stringPropertyNames()) { + for (String propertyName : properties.getPropertyKeys()) { if (StringUtils.startsWith(propertyName, NiFiProperties.SECURITY_IDENTITY_MAPPING_PATTERN_PREFIX)) { final String key = StringUtils.substringAfter(propertyName, NiFiProperties.SECURITY_IDENTITY_MAPPING_PATTERN_PREFIX); final String identityPattern = properties.getProperty(propertyName); http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java index b9ff082..167ddec 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java @@ -48,7 +48,6 @@ import org.apache.nifi.io.socket.SocketConfiguration; import org.apache.nifi.io.socket.SocketUtils; import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.util.FormatUtils; -import org.apache.nifi.util.NiFiProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,6 +62,7 @@ import org.slf4j.LoggerFactory; * */ public class StandardClusterCoordinationProtocolSender implements ClusterCoordinationProtocolSender { + private static final Logger logger = LoggerFactory.getLogger(StandardClusterCoordinationProtocolSender.class); private final ProtocolContext<ProtocolMessage> protocolContext; @@ -70,10 +70,6 @@ public class StandardClusterCoordinationProtocolSender implements ClusterCoordin private final int maxThreadsPerRequest; private int handshakeTimeoutSeconds; - public StandardClusterCoordinationProtocolSender(final SocketConfiguration socketConfiguration, final ProtocolContext<ProtocolMessage> protocolContext) { - this(socketConfiguration, protocolContext, NiFiProperties.getInstance().getClusterNodeProtocolThreads()); - } - public StandardClusterCoordinationProtocolSender(final SocketConfiguration socketConfiguration, final ProtocolContext<ProtocolMessage> protocolContext, final int maxThreadsPerRequest) { if (socketConfiguration == null) { throw new IllegalArgumentException("Socket configuration may not be null."); @@ -90,7 +86,6 @@ public class StandardClusterCoordinationProtocolSender implements ClusterCoordin public void setBulletinRepository(final BulletinRepository bulletinRepository) { } - /** * Requests a node to reconnect to the cluster. The configured value for * handshake timeout is applied to the socket before making the request. @@ -158,7 +153,6 @@ public class StandardClusterCoordinationProtocolSender implements ClusterCoordin } } - private void setConnectionHandshakeTimeoutOnSocket(final Socket socket) throws SocketException { // update socket timeout, if handshake timeout was set; otherwise use socket's current timeout if (handshakeTimeoutSeconds >= 0) { http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java index c216ed3..d2db59e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.nifi.cluster.coordination.heartbeat; import org.apache.nifi.cluster.coordination.ClusterCoordinator; @@ -31,7 +30,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; -import java.util.Properties; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -45,11 +43,10 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor { private volatile ScheduledFuture<?> future; private volatile boolean stopped = true; - - public AbstractHeartbeatMonitor(final ClusterCoordinator clusterCoordinator, final Properties properties) { + public AbstractHeartbeatMonitor(final ClusterCoordinator clusterCoordinator, final NiFiProperties nifiProperties) { this.clusterCoordinator = clusterCoordinator; - final String heartbeatInterval = properties.getProperty(NiFiProperties.CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL, - NiFiProperties.DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL); + final String heartbeatInterval = nifiProperties.getProperty(NiFiProperties.CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL, + NiFiProperties.DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL); this.heartbeatIntervalMillis = (int) FormatUtils.getTimeDuration(heartbeatInterval, TimeUnit.MILLISECONDS); } @@ -118,8 +115,8 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor { } /** - * Fetches all of the latest heartbeats and updates the Cluster Coordinator as appropriate, - * based on the heartbeats received. + * Fetches all of the latest heartbeats and updates the Cluster Coordinator + * as appropriate, based on the heartbeats received. * * Visible for testing. */ @@ -145,7 +142,7 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor { processHeartbeat(heartbeat); } catch (final Exception e) { clusterCoordinator.reportEvent(null, Severity.ERROR, - "Received heartbeat from " + heartbeat.getNodeIdentifier() + " but failed to process heartbeat due to " + e); + "Received heartbeat from " + heartbeat.getNodeIdentifier() + " but failed to process heartbeat due to " + e); logger.error("Failed to process heartbeat from {} due to {}", heartbeat.getNodeIdentifier(), e.toString()); logger.error("", e); } @@ -162,7 +159,7 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor { final long secondsSinceLastHeartbeat = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - heartbeat.getTimestamp()); clusterCoordinator.disconnectionRequestedByNode(heartbeat.getNodeIdentifier(), DisconnectionCode.LACK_OF_HEARTBEAT, - "Have not received a heartbeat from node in " + secondsSinceLastHeartbeat + " seconds"); + "Have not received a heartbeat from node in " + secondsSinceLastHeartbeat + " seconds"); try { removeHeartbeat(heartbeat.getNodeIdentifier()); @@ -201,8 +198,8 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor { if (heartbeat.getConnectionStatus().getState() != NodeConnectionState.CONNECTED && connectionState == NodeConnectionState.CONNECTED) { // Cluster Coordinator believes that node is connected, but node does not believe so. clusterCoordinator.reportEvent(nodeId, Severity.WARNING, "Received heartbeat from node that thinks it is not yet part of the cluster," - + "though the Cluster Coordinator thought it was (node claimed state was " + heartbeat.getConnectionStatus().getState() - + "). Marking as Disconnected and requesting that Node reconnect to cluster"); + + "though the Cluster Coordinator thought it was (node claimed state was " + heartbeat.getConnectionStatus().getState() + + "). Marking as Disconnected and requesting that Node reconnect to cluster"); clusterCoordinator.requestNodeConnect(nodeId, null); return; } @@ -220,7 +217,7 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor { case NOT_YET_CONNECTED: case STARTUP_FAILURE: { clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received heartbeat from node previously " - + "disconnected due to " + disconnectionCode + ". Issuing reconnection request."); + + "disconnected due to " + disconnectionCode + ". Issuing reconnection request."); clusterCoordinator.requestNodeConnect(nodeId, null); break; @@ -260,23 +257,25 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor { clusterCoordinator.updateNodeRoles(nodeId, heartbeat.getRoles()); } - /** - * @return the most recent heartbeat information for each node in the cluster + * @return the most recent heartbeat information for each node in the + * cluster */ protected abstract Map<NodeIdentifier, NodeHeartbeat> getLatestHeartbeats(); /** - * This method does nothing in the abstract class but is meant for subclasses to - * override in order to provide functionality when the monitor is started. + * This method does nothing in the abstract class but is meant for + * subclasses to override in order to provide functionality when the monitor + * is started. */ protected void onStart() { } /** - * This method does nothing in the abstract class but is meant for subclasses to - * override in order to provide functionality when the monitor is stopped. + * This method does nothing in the abstract class but is meant for + * subclasses to override in order to provide functionality when the monitor + * is stopped. */ protected void onStop() { } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java index d2d81d1..95b2045 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.nifi.cluster.coordination.heartbeat; import java.nio.charset.StandardCharsets; @@ -22,7 +21,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -54,10 +52,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Uses Apache ZooKeeper to advertise the address to send heartbeats to, and then relies on the NiFi Cluster - * Protocol to receive heartbeat messages from nodes in the cluster. + * Uses Apache ZooKeeper to advertise the address to send heartbeats to, and + * then relies on the NiFi Cluster Protocol to receive heartbeat messages from + * nodes in the cluster. */ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor implements HeartbeatMonitor, ProtocolHandler { + protected static final Logger logger = LoggerFactory.getLogger(ClusterProtocolHeartbeatMonitor.class); private static final String COORDINATOR_ZNODE_NAME = "coordinator"; @@ -81,30 +81,29 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im } } - - public ClusterProtocolHeartbeatMonitor(final ClusterCoordinator clusterCoordinator, final ProtocolListener protocolListener, final Properties properties) { - super(clusterCoordinator, properties); + public ClusterProtocolHeartbeatMonitor(final ClusterCoordinator clusterCoordinator, final ProtocolListener protocolListener, final NiFiProperties nifiProperties) { + super(clusterCoordinator, nifiProperties); protocolListener.addHandler(this); - this.zkClientConfig = ZooKeeperClientConfig.createConfig(properties); + this.zkClientConfig = ZooKeeperClientConfig.createConfig(nifiProperties); this.clusterNodesPath = zkClientConfig.resolvePath("cluster/nodes"); - String hostname = properties.getProperty(NiFiProperties.CLUSTER_NODE_ADDRESS); + String hostname = nifiProperties.getProperty(NiFiProperties.CLUSTER_NODE_ADDRESS); if (hostname == null || hostname.trim().isEmpty()) { hostname = "localhost"; } - final String port = properties.getProperty(NiFiProperties.CLUSTER_NODE_PROTOCOL_PORT); + final String port = nifiProperties.getProperty(NiFiProperties.CLUSTER_NODE_PROTOCOL_PORT); if (port == null || port.trim().isEmpty()) { throw new RuntimeException("Unable to determine which port Cluster Coordinator Protocol is listening on because the '" - + NiFiProperties.CLUSTER_NODE_PROTOCOL_PORT + "' property is not set"); + + NiFiProperties.CLUSTER_NODE_PROTOCOL_PORT + "' property is not set"); } try { Integer.parseInt(port); } catch (final NumberFormatException nfe) { throw new RuntimeException("Unable to determine which port Cluster Coordinator Protocol is listening on because the '" - + NiFiProperties.CLUSTER_NODE_PROTOCOL_PORT + "' property is set to '" + port + "', which is not a valid port number."); + + NiFiProperties.CLUSTER_NODE_PROTOCOL_PORT + "' property is set to '" + port + "', which is not a valid port number."); } heartbeatAddress = hostname + ":" + port; @@ -114,12 +113,12 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im public void onStart() { final RetryPolicy retryPolicy = new RetryForever(5000); curatorClient = CuratorFrameworkFactory.builder() - .connectString(zkClientConfig.getConnectString()) - .sessionTimeoutMs(zkClientConfig.getSessionTimeoutMillis()) - .connectionTimeoutMs(zkClientConfig.getConnectionTimeoutMillis()) - .retryPolicy(retryPolicy) - .defaultData(new byte[0]) - .build(); + .connectString(zkClientConfig.getConnectString()) + .sessionTimeoutMs(zkClientConfig.getSessionTimeoutMillis()) + .connectionTimeoutMs(zkClientConfig.getConnectionTimeoutMillis()) + .retryPolicy(retryPolicy) + .defaultData(new byte[0]) + .build(); curatorClient.start(); // We don't know what the heartbeats look like for the nodes, since we were just elected to monitoring @@ -130,7 +129,7 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im heartbeatMessages.clear(); for (final NodeIdentifier nodeId : clusterCoordinator.getNodeIdentifiers()) { final NodeHeartbeat heartbeat = new StandardNodeHeartbeat(nodeId, System.currentTimeMillis(), - clusterCoordinator.getConnectionStatus(nodeId), Collections.emptySet(), 0, 0L, 0, System.currentTimeMillis()); + clusterCoordinator.getConnectionStatus(nodeId), Collections.emptySet(), 0, 0L, 0, System.currentTimeMillis()); heartbeatMessages.put(nodeId, heartbeat); } @@ -199,7 +198,6 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im return new HashSet<>(clusterNodeIds.values()); } - @Override public ProtocolMessage handle(final ProtocolMessage msg) throws ProtocolException { if (msg.getType() != MessageType.HEARTBEAT) { @@ -220,7 +218,7 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im final long systemStartTime = payload.getSystemStartTime(); final NodeHeartbeat nodeHeartbeat = new StandardNodeHeartbeat(nodeId, System.currentTimeMillis(), - connectionStatus, roles, flowFileCount, flowFileBytes, activeThreadCount, systemStartTime); + connectionStatus, roles, flowFileCount, flowFileBytes, activeThreadCount, systemStartTime); heartbeatMessages.put(heartbeat.getNodeIdentifier(), nodeHeartbeat); logger.debug("Received new heartbeat from {}", nodeId); http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java index 784eb2f..7b1d8f6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.nifi.cluster.coordination.http; import org.apache.nifi.cluster.coordination.http.endpoints.BulletinBoardEndpointMerger; @@ -71,13 +70,25 @@ import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; public class StandardHttpResponseMerger implements HttpResponseMerger { + private Logger logger = LoggerFactory.getLogger(StandardHttpResponseMerger.class); - private static final List<EndpointResponseMerger> endpointMergers = new ArrayList<>(); - static { + private final List<EndpointResponseMerger> endpointMergers = new ArrayList<>(); + + public StandardHttpResponseMerger(final NiFiProperties nifiProperties) { + final String snapshotFrequency = nifiProperties.getProperty(NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY, NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY); + long snapshotMillis; + try { + snapshotMillis = FormatUtils.getTimeDuration(snapshotFrequency, TimeUnit.MILLISECONDS); + } catch (final Exception e) { + snapshotMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY, TimeUnit.MILLISECONDS); + } endpointMergers.add(new ControllerStatusEndpointMerger()); endpointMergers.add(new ControllerBulletinsEndpointMerger()); endpointMergers.add(new GroupStatusEndpointMerger()); @@ -108,7 +119,7 @@ public class StandardHttpResponseMerger implements HttpResponseMerger { endpointMergers.add(new ListFlowFilesEndpointMerger()); endpointMergers.add(new ComponentStateEndpointMerger()); endpointMergers.add(new BulletinBoardEndpointMerger()); - endpointMergers.add(new StatusHistoryEndpointMerger()); + endpointMergers.add(new StatusHistoryEndpointMerger(snapshotMillis)); endpointMergers.add(new SystemDiagnosticsEndpointMerger()); endpointMergers.add(new CountersEndpointMerger()); endpointMergers.add(new FlowMerger()); @@ -122,9 +133,6 @@ public class StandardHttpResponseMerger implements HttpResponseMerger { endpointMergers.add(new FunnelsEndpointMerger()); } - public StandardHttpResponseMerger() { - } - @Override public NodeResponse mergeResponses(final URI uri, final String httpMethod, final Set<NodeResponse> nodeResponses) { if (nodeResponses.size() == 1) { @@ -170,7 +178,6 @@ public class StandardHttpResponseMerger implements HttpResponseMerger { return response; } - @Override public Set<NodeResponse> getProblematicNodeResponses(final Set<NodeResponse> allResponses) { // Check if there are any 2xx responses @@ -190,7 +197,7 @@ public class StandardHttpResponseMerger implements HttpResponseMerger { return getEndpointResponseMerger(uri, httpMethod) != null; } - private static EndpointResponseMerger getEndpointResponseMerger(final URI uri, final String httpMethod) { + private EndpointResponseMerger getEndpointResponseMerger(final URI uri, final String httpMethod) { return endpointMergers.stream().filter(p -> p.canHandle(uri, httpMethod)).findFirst().orElse(null); } @@ -198,13 +205,12 @@ public class StandardHttpResponseMerger implements HttpResponseMerger { return allResponses.stream().anyMatch(p -> p.is2xx()); } - private void drainResponses(final Set<NodeResponse> responses, final NodeResponse exclude) { responses.stream() - .parallel() // parallelize the draining of the responses, since we have multiple streams to consume - .filter(response -> response != exclude) // don't include the explicitly excluded node - .filter(response -> response.getStatus() != RequestReplicator.NODE_CONTINUE_STATUS_CODE) // don't include any 150-NodeContinue responses because they contain no content - .forEach(response -> drainResponse(response)); // drain all node responses that didn't get filtered out + .parallel() // parallelize the draining of the responses, since we have multiple streams to consume + .filter(response -> response != exclude) // don't include the explicitly excluded node + .filter(response -> response.getStatus() != RequestReplicator.NODE_CONTINUE_STATUS_CODE) // don't include any 150-NodeContinue responses because they contain no content + .forEach(response -> drainResponse(response)); // drain all node responses that didn't get filtered out } private void drainResponse(final NodeResponse response) { http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java index 507c1fb..ddd8759 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.nifi.cluster.coordination.http.endpoints; import java.net.URI; @@ -26,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger; @@ -40,14 +38,13 @@ import org.apache.nifi.controller.status.history.RemoteProcessGroupStatusDescrip import org.apache.nifi.controller.status.history.StandardStatusSnapshot; import org.apache.nifi.controller.status.history.StatusHistoryUtil; import org.apache.nifi.controller.status.history.StatusSnapshot; -import org.apache.nifi.util.FormatUtils; -import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.api.dto.status.NodeStatusSnapshotsDTO; import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO; import org.apache.nifi.web.api.entity.StatusHistoryEntity; public class StatusHistoryEndpointMerger implements EndpointResponseMerger { + public static final Pattern PROCESSOR_STATUS_HISTORY_URI_PATTERN = Pattern.compile("/nifi-api/flow/processors/[a-f0-9\\-]{36}/status/history"); public static final Pattern PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN = Pattern.compile("/nifi-api/flow/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/status/history"); public static final Pattern REMOTE_PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN = Pattern.compile("/nifi-api/flow/remote-process-groups/[a-f0-9\\-]{36}/status/history"); @@ -55,17 +52,8 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger { private final long componentStatusSnapshotMillis; - - public StatusHistoryEndpointMerger() { - final NiFiProperties properties = NiFiProperties.getInstance(); - final String snapshotFrequency = properties.getProperty(NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY, NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY); - long snapshotMillis; - try { - snapshotMillis = FormatUtils.getTimeDuration(snapshotFrequency, TimeUnit.MILLISECONDS); - } catch (final Exception e) { - snapshotMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY, TimeUnit.MILLISECONDS); - } - componentStatusSnapshotMillis = snapshotMillis; + public StatusHistoryEndpointMerger(final long componentStatusSnapshotMillis) { + this.componentStatusSnapshotMillis = componentStatusSnapshotMillis; } private Map<String, MetricDescriptor<?>> getMetricDescriptors(final URI uri) {