This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch support/nifi-1.11.x in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 833eea4547fa076723a1ff9d7bdcae5f37bbf497 Author: Peter Turcsanyi <[email protected]> AuthorDate: Wed Mar 11 15:46:05 2020 +0100 NIFI-7245: JMS processors yield when connection factory initialisation failed Signed-off-by: Pierre Villard <[email protected]> This closes #4133. --- .../nifi/jms/processors/AbstractJMSProcessor.java | 8 ++++++- .../apache/nifi/jms/processors/ConsumeJMSIT.java | 26 ++++++++++++++++++++++ .../apache/nifi/jms/processors/PublishJMSIT.java | 25 +++++++++++++++++++++ 3 files changed, 58 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java index 39b8dac..78b2d90 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java @@ -179,7 +179,13 @@ public abstract class AbstractJMSProcessor<T extends JMSWorker> extends Abstract public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { T worker = workerPool.poll(); if (worker == null) { - worker = buildTargetResource(context); + try { + worker = buildTargetResource(context); + } catch (Exception e) { + getLogger().error("Failed to initialize JMS Connection Factory", e); + context.yield(); + return; + } } try { diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java index 714b950..64728fe 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java @@ -31,6 +31,8 @@ import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.transport.tcp.TcpTransport; import org.apache.activemq.transport.tcp.TcpTransportFactory; import org.apache.activemq.wireformat.WireFormat; +import org.apache.nifi.jms.cf.JMSConnectionFactoryProperties; +import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider; import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; @@ -418,6 +420,30 @@ public class ConsumeJMSIT { } } + @Test + public void whenExceptionIsRaisedDuringConnectionFactoryInitializationTheProcessorShouldBeYielded() throws Exception { + TestRunner runner = TestRunners.newTestRunner(ConsumeJMS.class); + + // using (non-JNDI) JMS Connection Factory via controller service + JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider(); + runner.addControllerService("cfProvider", cfProvider); + runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, "DummyJMSConnectionFactoryClass"); + runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, "DummyBrokerUri"); + runner.enableControllerService(cfProvider); + + runner.setProperty(ConsumeJMS.CF_SERVICE, "cfProvider"); + runner.setProperty(ConsumeJMS.DESTINATION, "myTopic"); + runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.TOPIC); + + try { + runner.run(); + fail("The test was implemented in a way this line should not be reached."); + } catch (AssertionError e) { + } finally { + assertTrue("In case of an exception, the processor should be yielded.", ((MockProcessContext) runner.getProcessContext()).isYieldCalled()); + } + } + private static void publishAMessage(ActiveMQConnectionFactory cf, final String destinationName, String messageContent) throws JMSException { // Publish a message. try (Connection conn = cf.createConnection(); diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java index 247a3ac..b901b73 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -31,10 +32,12 @@ import org.apache.activemq.transport.tcp.TcpTransport; import org.apache.activemq.transport.tcp.TcpTransportFactory; import org.apache.activemq.wireformat.WireFormat; import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition; +import org.apache.nifi.jms.cf.JndiJmsConnectionFactoryProperties; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.MockProcessContext; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Test; @@ -381,4 +384,26 @@ public class PublishJMSIT { } } } + + @Test + public void whenExceptionIsRaisedDuringConnectionFactoryInitializationTheProcessorShouldBeYielded() throws Exception { + TestRunner runner = TestRunners.newTestRunner(PublishJMS.class); + + // using JNDI JMS Connection Factory configured locally on the processor + runner.setProperty(JndiJmsConnectionFactoryProperties.JNDI_INITIAL_CONTEXT_FACTORY, "DummyInitialContextFactoryClass"); + runner.setProperty(JndiJmsConnectionFactoryProperties.JNDI_PROVIDER_URL, "DummyProviderUrl"); + runner.setProperty(JndiJmsConnectionFactoryProperties.JNDI_CONNECTION_FACTORY_NAME, "DummyConnectionFactoryName"); + + runner.setProperty(ConsumeJMS.DESTINATION, "myTopic"); + runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.TOPIC); + + try { + runner.enqueue("message"); + runner.run(); + fail("The test was implemented in a way this line should not be reached."); + } catch (AssertionError e) { + } finally { + assertTrue("In case of an exception, the processor should be yielded.", ((MockProcessContext) runner.getProcessContext()).isYieldCalled()); + } + } }
