This is an automated email from the ASF dual-hosted git repository. turcsanyi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new badbc1aca6 NIFI-12022 Extract verification logic from JMSConnectionFactoryProvider badbc1aca6 is described below commit badbc1aca66684187093ff82fde6bfb28db549c7 Author: Nandor Soma Abonyi <nsabo...@apache.org> AuthorDate: Mon Aug 28 10:39:30 2023 +0200 NIFI-12022 Extract verification logic from JMSConnectionFactoryProvider This closes #7667. Signed-off-by: Peter Turcsanyi <turcsa...@apache.org> --- .../cf/AbstractJMSConnectionFactoryProvider.java} | 66 ++--------- .../jms/cf/CachedJMSConnectionFactoryHandler.java | 53 +++++++++ .../cf/JMSConnectionFactoryHandlerDefinition.java} | 20 +--- .../nifi/jms/cf/JMSConnectionFactoryHandler.java | 80 ++++++------- .../nifi/jms/cf/JMSConnectionFactoryProvider.java | 126 +-------------------- .../jms/cf/JndiJmsConnectionFactoryHandler.java | 24 +--- .../jms/cf/JMSConnectionFactoryHandlerForTest.java | 6 +- .../cf/JMSConnectionFactoryProviderForTest.java | 1 - 8 files changed, 111 insertions(+), 265 deletions(-) diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/AbstractJMSConnectionFactoryProvider.java similarity index 64% copy from nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java copy to nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/AbstractJMSConnectionFactoryProvider.java index 8a8c4b12fe..a2be311727 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/AbstractJMSConnectionFactoryProvider.java @@ -16,23 +16,14 @@ */ package org.apache.nifi.jms.cf; -import org.apache.nifi.annotation.behavior.DynamicProperty; -import org.apache.nifi.annotation.behavior.Restricted; -import org.apache.nifi.annotation.behavior.Restriction; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.SeeAlso; -import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnDisabled; import org.apache.nifi.annotation.lifecycle.OnEnabled; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.RequiredPermission; -import org.apache.nifi.controller.AbstractControllerService; -import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.expression.ExpressionLanguageScope; -import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.components.ConfigVerificationResult.Outcome; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.VerifiableControllerService; +import org.apache.nifi.logging.ComponentLog; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -45,52 +36,19 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicReference; /** - * Provides a factory service that creates and initializes - * {@link ConnectionFactory} specific to the third party JMS system. - * <p> - * It accomplishes it by adjusting current classpath by adding to it the - * additional resources (i.e., JMS client libraries) provided by the user via - * {@link JMSConnectionFactoryProperties#JMS_CLIENT_LIBRARIES}, allowing it then to create an instance of the - * target {@link ConnectionFactory} based on the provided - * {@link JMSConnectionFactoryProperties#JMS_CONNECTION_FACTORY_IMPL} which can be than access via - * {@link #getConnectionFactory()} method. + * Base JMS controller service implementation that provides verification logic. */ -@Tags({"jms", "messaging", "integration", "queue", "topic", "publish", "subscribe"}) -@CapabilityDescription("Provides a generic service to create vendor specific javax.jms.ConnectionFactory implementations. " - + "The Connection Factory can be served once this service is configured successfully.") -@DynamicProperty(name = "The name of a Connection Factory configuration property.", value = "The value of a given Connection Factory configuration property.", - description = "The properties that are set following Java Beans convention where a property name is derived from the 'set*' method of the vendor " - + "specific ConnectionFactory's implementation. For example, 'com.ibm.mq.jms.MQConnectionFactory.setChannel(String)' would imply 'channel' " - + "property and 'com.ibm.mq.jms.MQConnectionFactory.setTransportType(int)' would imply 'transportType' property.", - expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY) -@SeeAlso(classNames = {"org.apache.nifi.jms.processors.ConsumeJMS", "org.apache.nifi.jms.processors.PublishJMS"}) -@Restricted( - restrictions = { - @Restriction( - requiredPermission = RequiredPermission.REFERENCE_REMOTE_RESOURCES, - explanation = "Client Library Location can reference resources over HTTP" - ) - } -) -public class JMSConnectionFactoryProvider extends AbstractControllerService implements JMSConnectionFactoryProviderDefinition, VerifiableControllerService { +public abstract class AbstractJMSConnectionFactoryProvider extends AbstractControllerService implements JMSConnectionFactoryProviderDefinition, VerifiableControllerService { private static final String ESTABLISH_CONNECTION = "Establish Connection"; private static final String VERIFY_JMS_INTERACTION = "Verify JMS Interaction"; - protected volatile JMSConnectionFactoryHandler delegate; - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return JMSConnectionFactoryProperties.getPropertyDescriptors(); - } + protected volatile JMSConnectionFactoryHandlerDefinition delegate; - @Override - protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { - return JMSConnectionFactoryProperties.getDynamicPropertyDescriptor(propertyDescriptorName); - } + protected abstract JMSConnectionFactoryHandlerDefinition createConnectionFactoryHandler(ConfigurationContext context, ComponentLog logger); @OnEnabled public void onEnabled(ConfigurationContext context) { - delegate = new JMSConnectionFactoryHandler(context, getLogger()); + delegate = createConnectionFactoryHandler(context, getLogger()); } @OnDisabled @@ -111,7 +69,7 @@ public class JMSConnectionFactoryProvider extends AbstractControllerService impl @Override public List<ConfigVerificationResult> verify(final ConfigurationContext context, final ComponentLog verificationLogger, final Map<String, String> variables) { final List<ConfigVerificationResult> results = new ArrayList<>(); - final JMSConnectionFactoryHandler handler = new JMSConnectionFactoryHandler(context, verificationLogger); + final IJMSConnectionFactoryProvider handler = createConnectionFactoryHandler(context, verificationLogger); final AtomicReference<Exception> failureReason = new AtomicReference<>(); final ExceptionListener listener = failureReason::set; @@ -152,7 +110,7 @@ public class JMSConnectionFactoryProvider extends AbstractControllerService impl results.add(new ConfigVerificationResult.Builder() .verificationStepName(ESTABLISH_CONNECTION) .outcome(Outcome.SKIPPED) - .explanation("Could not establish a Connection because doing so requires that a username and password be provided") + .explanation("Could not establish a Connection because doing so requires a valid username and password") .build()); } catch (final Exception e) { logger.warn("Failed to establish a connection to the JMS Server in order to verify configuration", e); @@ -160,7 +118,7 @@ public class JMSConnectionFactoryProvider extends AbstractControllerService impl results.add(new ConfigVerificationResult.Builder() .verificationStepName(ESTABLISH_CONNECTION) .outcome(Outcome.FAILED) - .explanation("Was not able to establish a connection to the JMS Server: " + e.toString()) + .explanation("Was not able to establish a connection to the JMS Server: " + e) .build()); } @@ -191,7 +149,7 @@ public class JMSConnectionFactoryProvider extends AbstractControllerService impl results.add(new ConfigVerificationResult.Builder() .verificationStepName(VERIFY_JMS_INTERACTION) .outcome(Outcome.FAILED) - .explanation("Was not able to create a JMS Session: " + failure.toString()) + .explanation("Was not able to create a JMS Session: " + failure) .build()); } } diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/CachedJMSConnectionFactoryHandler.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/CachedJMSConnectionFactoryHandler.java new file mode 100644 index 0000000000..86e4953afd --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/CachedJMSConnectionFactoryHandler.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.cf; + +import org.apache.nifi.logging.ComponentLog; + +import javax.jms.ConnectionFactory; + +public abstract class CachedJMSConnectionFactoryHandler implements JMSConnectionFactoryHandlerDefinition { + + private final ComponentLog logger; + + private volatile ConnectionFactory connectionFactory; + + protected CachedJMSConnectionFactoryHandler(ComponentLog logger) { + this.logger = logger; + } + + public abstract ConnectionFactory createConnectionFactory(); + + @Override + public synchronized ConnectionFactory getConnectionFactory() { + if (connectionFactory == null) { + connectionFactory = createConnectionFactory(); + } else { + logger.debug("Connection Factory has already been initialized. Will return cached instance."); + } + + return connectionFactory; + } + + @Override + public synchronized void resetConnectionFactory(ConnectionFactory cachedFactory) { + if (cachedFactory == connectionFactory) { + logger.debug("Resetting connection factory"); + connectionFactory = null; + } + } +} diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderForTest.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryHandlerDefinition.java similarity index 55% copy from nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderForTest.java copy to nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryHandlerDefinition.java index 9e56e9bd8d..ebfe8c6980 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderForTest.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryHandlerDefinition.java @@ -16,24 +16,8 @@ */ package org.apache.nifi.jms.cf; -import org.apache.nifi.annotation.lifecycle.OnEnabled; -import org.apache.nifi.controller.ConfigurationContext; - -import java.util.Map; - /** - * Sub-class of {@link JMSConnectionFactoryProvider} only for testing purpose + * Base interface of handler implementations of IJMSConnectionFactoryProvider. */ -public class JMSConnectionFactoryProviderForTest extends JMSConnectionFactoryProvider { - - @OnEnabled - @Override - public void onEnabled(ConfigurationContext context) { - delegate = new JMSConnectionFactoryHandlerForTest(context, getLogger()); - delegate.setConnectionFactoryProperties(); - } - - public Map<String, Object> getConfiguredProperties() { - return ((JMSConnectionFactoryHandlerForTest) delegate).getConfiguredProperties(); - } +public interface JMSConnectionFactoryHandlerDefinition extends IJMSConnectionFactoryProvider { } diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryHandler.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryHandler.java index 44de835738..7db1c69e20 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryHandler.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryHandler.java @@ -38,46 +38,28 @@ import org.apache.nifi.ssl.SSLContextService; * implementation class and configuring the Connection Factory object directly. * The handler can be used from controller services and processors as well. */ -public class JMSConnectionFactoryHandler implements IJMSConnectionFactoryProvider { +public class JMSConnectionFactoryHandler extends CachedJMSConnectionFactoryHandler { private final PropertyContext context; private final Set<PropertyDescriptor> propertyDescriptors; private final ComponentLog logger; public JMSConnectionFactoryHandler(ConfigurationContext context, ComponentLog logger) { + super(logger); this.context = context; this.propertyDescriptors = context.getProperties().keySet(); this.logger = logger; } public JMSConnectionFactoryHandler(ProcessContext context, ComponentLog logger) { + super(logger); this.context = context; this.propertyDescriptors = context.getProperties().keySet(); this.logger = logger; } - private volatile ConnectionFactory connectionFactory; - - @Override - public synchronized ConnectionFactory getConnectionFactory() { - if (connectionFactory == null) { - initConnectionFactory(); - } else { - logger.debug("Connection Factory has already been initialized. Will return cached instance."); - } - - return connectionFactory; - } - @Override - public synchronized void resetConnectionFactory(ConnectionFactory cachedFactory) { - if (cachedFactory == connectionFactory) { - logger.debug("Resetting connection factory"); - connectionFactory = null; - } - } - - private void initConnectionFactory() { + public ConnectionFactory createConnectionFactory() { try { if (logger.isInfoEnabled()) { logger.info("Configuring " + getClass().getSimpleName() + " for '" @@ -85,10 +67,11 @@ public class JMSConnectionFactoryHandler implements IJMSConnectionFactoryProvide + context.getProperty(JMS_BROKER_URI).evaluateAttributeExpressions().getValue() + "'"); } - createConnectionFactoryInstance(); - setConnectionFactoryProperties(); + final ConnectionFactory connectionFactory = createConnectionFactoryInstance(); + setConnectionFactoryProperties(connectionFactory); + + return connectionFactory; } catch (Exception e) { - connectionFactory = null; logger.error("Failed to configure " + getClass().getSimpleName(), e); throw new IllegalStateException(e); } @@ -98,9 +81,9 @@ public class JMSConnectionFactoryHandler implements IJMSConnectionFactoryProvide * Creates an instance of the {@link ConnectionFactory} from the provided * 'CONNECTION_FACTORY_IMPL'. */ - private void createConnectionFactoryInstance() { - String connectionFactoryImplName = context.getProperty(JMS_CONNECTION_FACTORY_IMPL).evaluateAttributeExpressions().getValue(); - connectionFactory = Utils.newDefaultInstance(connectionFactoryImplName); + private ConnectionFactory createConnectionFactoryInstance() { + final String connectionFactoryImplName = context.getProperty(JMS_CONNECTION_FACTORY_IMPL).evaluateAttributeExpressions().getValue(); + return Utils.newDefaultInstance(connectionFactoryImplName); } /** @@ -135,18 +118,18 @@ public class JMSConnectionFactoryHandler implements IJMSConnectionFactoryProvide * @see <a href="https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_7.1.0/com.ibm.mq.javadoc.doc/WMQJMSClasses/com/ibm/mq/jms/MQConnectionFactory.html#setHostName_java.lang.String_">setHostName(String hostname)</a> * @see <a href="https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_7.1.0/com.ibm.mq.javadoc.doc/WMQJMSClasses/com/ibm/mq/jms/MQConnectionFactory.html#setPort_int_">setPort(int port)</a> * @see <a href="https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_7.1.0/com.ibm.mq.javadoc.doc/WMQJMSClasses/com/ibm/mq/jms/MQConnectionFactory.html#setConnectionNameList_java.lang.String_">setConnectionNameList(String hosts)</a> - * @see #setProperty(String propertyName, Object propertyValue) + * @see #setProperty(ConnectionFactory connectionFactory, String propertyName, Object propertyValue) */ - void setConnectionFactoryProperties() { + void setConnectionFactoryProperties(ConnectionFactory connectionFactory) { String connectionFactoryValue = context.getProperty(JMS_CONNECTION_FACTORY_IMPL).evaluateAttributeExpressions().getValue(); if (context.getProperty(JMS_BROKER_URI).isSet()) { String brokerValue = context.getProperty(JMS_BROKER_URI).evaluateAttributeExpressions().getValue(); if (connectionFactoryValue.startsWith("org.apache.activemq")) { - setProperty("brokerURL", brokerValue); + setProperty(connectionFactory, "brokerURL", brokerValue); } else if (connectionFactoryValue.startsWith("com.tibco.tibjms")) { - setProperty("serverUrl", brokerValue); + setProperty(connectionFactory, "serverUrl", brokerValue); } else if (connectionFactoryValue.startsWith("org.apache.qpid.jms")) { - setProperty("remoteURI", brokerValue); + setProperty(connectionFactory, "remoteURI", brokerValue); } else { String[] brokerList = brokerValue.split(","); if (connectionFactoryValue.startsWith("com.ibm.mq.jms")) { @@ -159,14 +142,14 @@ public class JMSConnectionFactoryHandler implements IJMSConnectionFactoryProvide ibmConList.add(broker); } } - setProperty("connectionNameList", String.join(",", ibmConList)); + setProperty(connectionFactory, "connectionNameList", String.join(",", ibmConList)); } else { // Try to parse broker URI as colon separated host/port pair. Use first pair if multiple given. String[] hostPort = brokerList[0].split(":"); if (hostPort.length == 2) { // If broker URI indeed was colon separated host/port pair - setProperty("hostName", hostPort[0]); - setProperty("port", hostPort[1]); + setProperty(connectionFactory, "hostName", hostPort[0]); + setProperty(connectionFactory, "port", hostPort[1]); } } } @@ -177,21 +160,21 @@ public class JMSConnectionFactoryHandler implements IJMSConnectionFactoryProvide SSLContext sslContext = sslContextService.createContext(); if (connectionFactoryValue.startsWith("org.apache.activemq")) { if (sslContextService.isTrustStoreConfigured()) { - setProperty("trustStore", sslContextService.getTrustStoreFile()); - setProperty("trustStorePassword", sslContextService.getTrustStorePassword()); - setProperty("trustStoreType", sslContextService.getTrustStoreType()); + setProperty(connectionFactory, "trustStore", sslContextService.getTrustStoreFile()); + setProperty(connectionFactory, "trustStorePassword", sslContextService.getTrustStorePassword()); + setProperty(connectionFactory, "trustStoreType", sslContextService.getTrustStoreType()); } if (sslContextService.isKeyStoreConfigured()) { - setProperty("keyStore", sslContextService.getKeyStoreFile()); - setProperty("keyStorePassword", sslContextService.getKeyStorePassword()); - setProperty("keyStoreKeyPassword", sslContextService.getKeyPassword()); - setProperty("keyStoreType", sslContextService.getKeyStoreType()); + setProperty(connectionFactory, "keyStore", sslContextService.getKeyStoreFile()); + setProperty(connectionFactory, "keyStorePassword", sslContextService.getKeyStorePassword()); + setProperty(connectionFactory, "keyStoreKeyPassword", sslContextService.getKeyPassword()); + setProperty(connectionFactory, "keyStoreType", sslContextService.getKeyStoreType()); } } else if (connectionFactoryValue.startsWith("org.apache.qpid.jms")) { - setProperty("sslContext", sslContext); + setProperty(connectionFactory, "sslContext", sslContext); } else { // IBM MQ (and others) - setProperty("sSLSocketFactory", sslContext.getSocketFactory()); + setProperty(connectionFactory, "sSLSocketFactory", sslContext.getSocketFactory()); } } @@ -200,7 +183,7 @@ public class JMSConnectionFactoryHandler implements IJMSConnectionFactoryProvide .forEach(descriptor -> { String propertyName = descriptor.getName(); String propertyValue = context.getProperty(descriptor).evaluateAttributeExpressions().getValue(); - setProperty(propertyName, propertyValue); + setProperty(connectionFactory, propertyName, propertyValue); }); } @@ -222,7 +205,7 @@ public class JMSConnectionFactoryHandler implements IJMSConnectionFactoryProvide * follow bean convention and all their properties using Java primitives as * arguments. */ - void setProperty(String propertyName, Object propertyValue) { + void setProperty(ConnectionFactory connectionFactory, String propertyName, Object propertyValue) { String methodName = toMethodName(propertyName); Method[] methods = Utils.findMethods(methodName, connectionFactory.getClass()); if (methods != null && methods.length > 0) { @@ -248,7 +231,7 @@ public class JMSConnectionFactoryHandler implements IJMSConnectionFactoryProvide throw new IllegalStateException("Failed to set property " + propertyName, e); } } else if (propertyName.equals("hostName")) { - setProperty("host", propertyValue); // try 'host' as another common convention. + setProperty(connectionFactory, "host", propertyValue); // try 'host' as another common convention. } } @@ -262,4 +245,5 @@ public class JMSConnectionFactoryHandler implements IJMSConnectionFactoryProvide c[0] = Character.toUpperCase(c[0]); return "set" + new String(c); } + } diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java index 8a8c4b12fe..331624537d 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java @@ -22,27 +22,14 @@ import org.apache.nifi.annotation.behavior.Restriction; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnDisabled; -import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.RequiredPermission; -import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.components.ConfigVerificationResult; -import org.apache.nifi.components.ConfigVerificationResult.Outcome; -import org.apache.nifi.controller.VerifiableControllerService; -import javax.jms.Connection; import javax.jms.ConnectionFactory; -import javax.jms.ExceptionListener; -import javax.jms.JMSSecurityException; -import javax.jms.Session; -import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; /** * Provides a factory service that creates and initializes @@ -72,11 +59,7 @@ import java.util.concurrent.atomic.AtomicReference; ) } ) -public class JMSConnectionFactoryProvider extends AbstractControllerService implements JMSConnectionFactoryProviderDefinition, VerifiableControllerService { - private static final String ESTABLISH_CONNECTION = "Establish Connection"; - private static final String VERIFY_JMS_INTERACTION = "Verify JMS Interaction"; - - protected volatile JMSConnectionFactoryHandler delegate; +public class JMSConnectionFactoryProvider extends AbstractJMSConnectionFactoryProvider { @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { @@ -88,111 +71,8 @@ public class JMSConnectionFactoryProvider extends AbstractControllerService impl return JMSConnectionFactoryProperties.getDynamicPropertyDescriptor(propertyDescriptorName); } - @OnEnabled - public void onEnabled(ConfigurationContext context) { - delegate = new JMSConnectionFactoryHandler(context, getLogger()); - } - - @OnDisabled - public void onDisabled() { - delegate = null; - } - - @Override - public ConnectionFactory getConnectionFactory() { - return delegate.getConnectionFactory(); - } - @Override - public void resetConnectionFactory(ConnectionFactory cachedFactory) { - delegate.resetConnectionFactory(cachedFactory); - } - - @Override - public List<ConfigVerificationResult> verify(final ConfigurationContext context, final ComponentLog verificationLogger, final Map<String, String> variables) { - final List<ConfigVerificationResult> results = new ArrayList<>(); - final JMSConnectionFactoryHandler handler = new JMSConnectionFactoryHandler(context, verificationLogger); - - final AtomicReference<Exception> failureReason = new AtomicReference<>(); - final ExceptionListener listener = failureReason::set; - - final Connection connection = createConnection(handler.getConnectionFactory(), results, listener, verificationLogger); - if (connection != null) { - try { - createSession(connection, results, failureReason.get(), verificationLogger); - } finally { - try { - connection.close(); - } catch (final Exception ignored) { - } - } - } - - return results; - } - - private Connection createConnection(final ConnectionFactory connectionFactory, final List<ConfigVerificationResult> results, final ExceptionListener exceptionListener, final ComponentLog logger) { - try { - final Connection connection = connectionFactory.createConnection(); - connection.setExceptionListener(exceptionListener); - - results.add(new ConfigVerificationResult.Builder() - .verificationStepName(ESTABLISH_CONNECTION) - .outcome(Outcome.SUCCESSFUL) - .explanation("Successfully established a JMS Connection") - .build()); - - return connection; - } catch (final JMSSecurityException se) { - // If we encounter a JMS Security Exception, the documentation states that it is because of an invalid username or password. - // There is no username or password configured for the Controller Service itself, however. Those are configured in processors, etc. - // As a result, if this is encountered, we will skip verification. - logger.debug("Failed to establish a connection to the JMS Server in order to verify configuration because encountered JMS Security Exception", se); - - results.add(new ConfigVerificationResult.Builder() - .verificationStepName(ESTABLISH_CONNECTION) - .outcome(Outcome.SKIPPED) - .explanation("Could not establish a Connection because doing so requires that a username and password be provided") - .build()); - } catch (final Exception e) { - logger.warn("Failed to establish a connection to the JMS Server in order to verify configuration", e); - - results.add(new ConfigVerificationResult.Builder() - .verificationStepName(ESTABLISH_CONNECTION) - .outcome(Outcome.FAILED) - .explanation("Was not able to establish a connection to the JMS Server: " + e.toString()) - .build()); - } - - return null; - } - - private void createSession(final Connection connection, final List<ConfigVerificationResult> results, final Exception capturedException, final ComponentLog logger) { - try { - final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - session.close(); - - results.add(new ConfigVerificationResult.Builder() - .verificationStepName(VERIFY_JMS_INTERACTION) - .outcome(Outcome.SUCCESSFUL) - .explanation("Established a JMS Session with server and successfully terminated it") - .build()); - } catch (final Exception e) { - final Exception failure; - if (capturedException == null) { - failure = e; - } else { - failure = capturedException; - failure.addSuppressed(e); - } - - logger.warn("Failed to create a JMS Session in order to verify configuration", failure); - - results.add(new ConfigVerificationResult.Builder() - .verificationStepName(VERIFY_JMS_INTERACTION) - .outcome(Outcome.FAILED) - .explanation("Was not able to create a JMS Session: " + failure.toString()) - .build()); - } + protected JMSConnectionFactoryHandlerDefinition createConnectionFactoryHandler(ConfigurationContext context, ComponentLog logger) { + return new JMSConnectionFactoryHandler(context, logger); } } diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JndiJmsConnectionFactoryHandler.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JndiJmsConnectionFactoryHandler.java index 53c216383e..468a1455ad 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JndiJmsConnectionFactoryHandler.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JndiJmsConnectionFactoryHandler.java @@ -43,43 +43,29 @@ import static org.apache.nifi.jms.cf.JndiJmsConnectionFactoryProperties.JNDI_PRO * Handler class to retrieve a JMS Connection Factory object via JNDI. * The handler can be used from controller services and processors as well. */ -public class JndiJmsConnectionFactoryHandler implements IJMSConnectionFactoryProvider { +public class JndiJmsConnectionFactoryHandler extends CachedJMSConnectionFactoryHandler { private final PropertyContext context; private final Set<PropertyDescriptor> propertyDescriptors; private final ComponentLog logger; - private volatile ConnectionFactory connectionFactory; - public JndiJmsConnectionFactoryHandler(ConfigurationContext context, ComponentLog logger) { + super(logger); this.context = context; this.propertyDescriptors = context.getProperties().keySet(); this.logger = logger; } public JndiJmsConnectionFactoryHandler(ProcessContext context, ComponentLog logger) { + super(logger); this.context = context; this.propertyDescriptors = context.getProperties().keySet(); this.logger = logger; } @Override - public synchronized ConnectionFactory getConnectionFactory() { - if (connectionFactory == null) { - connectionFactory = lookupConnectionFactory(); - } else { - logger.debug("Connection Factory has already been obtained from JNDI. Will return cached instance."); - } - - return connectionFactory; - } - - @Override - public synchronized void resetConnectionFactory(ConnectionFactory cachedFactory) { - if (cachedFactory == connectionFactory) { - logger.debug("Resetting connection factory"); - connectionFactory = null; - } + public ConnectionFactory createConnectionFactory() { + return lookupConnectionFactory(); } private ConnectionFactory lookupConnectionFactory() { diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryHandlerForTest.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryHandlerForTest.java index f8e087aa4b..4d349bee03 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryHandlerForTest.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryHandlerForTest.java @@ -19,6 +19,7 @@ package org.apache.nifi.jms.cf; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.logging.ComponentLog; +import javax.jms.ConnectionFactory; import java.util.HashMap; import java.util.Map; @@ -27,14 +28,15 @@ import java.util.Map; */ public class JMSConnectionFactoryHandlerForTest extends JMSConnectionFactoryHandler { - private Map<String, Object> configuredProperties = new HashMap<>(); + private final Map<String, Object> configuredProperties = new HashMap<>(); public JMSConnectionFactoryHandlerForTest(ConfigurationContext context, ComponentLog logger) { super(context, logger); + setConnectionFactoryProperties(null); } @Override - void setProperty(String propertyName, Object propertyValue) { + void setProperty(ConnectionFactory connectionFactory, String propertyName, Object propertyValue) { configuredProperties.put(propertyName, propertyValue); } diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderForTest.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderForTest.java index 9e56e9bd8d..9223b3bde5 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderForTest.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderForTest.java @@ -30,7 +30,6 @@ public class JMSConnectionFactoryProviderForTest extends JMSConnectionFactoryPro @Override public void onEnabled(ConfigurationContext context) { delegate = new JMSConnectionFactoryHandlerForTest(context, getLogger()); - delegate.setConnectionFactoryProperties(); } public Map<String, Object> getConfiguredProperties() {