This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3de3ad4  NIFI-7245: JMS processors yield when connection factory 
initialisation failed
3de3ad4 is described below

commit 3de3ad40290ccd4c9e09d5c4fd03e83a6cbf0d86
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Wed Mar 11 15:46:05 2020 +0100

    NIFI-7245: JMS processors yield when connection factory initialisation 
failed
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #4133.
---
 .../nifi/jms/processors/AbstractJMSProcessor.java  |  8 ++++++-
 .../apache/nifi/jms/processors/ConsumeJMSIT.java   | 26 ++++++++++++++++++++++
 .../apache/nifi/jms/processors/PublishJMSIT.java   | 25 +++++++++++++++++++++
 3 files changed, 58 insertions(+), 1 deletion(-)

diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
index 39b8dac..78b2d90 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
@@ -179,7 +179,13 @@ public abstract class AbstractJMSProcessor<T extends 
JMSWorker> extends Abstract
     public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
         T worker = workerPool.poll();
         if (worker == null) {
-            worker = buildTargetResource(context);
+            try {
+                worker = buildTargetResource(context);
+            } catch (Exception e) {
+                getLogger().error("Failed to initialize JMS Connection 
Factory", e);
+                context.yield();
+                return;
+            }
         }
 
         try {
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
index 714b950..64728fe 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
@@ -31,6 +31,8 @@ import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.transport.tcp.TcpTransport;
 import org.apache.activemq.transport.tcp.TcpTransportFactory;
 import org.apache.activemq.wireformat.WireFormat;
+import org.apache.nifi.jms.cf.JMSConnectionFactoryProperties;
+import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider;
 import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
@@ -418,6 +420,30 @@ public class ConsumeJMSIT {
         }
     }
 
+    @Test
+    public void 
whenExceptionIsRaisedDuringConnectionFactoryInitializationTheProcessorShouldBeYielded()
 throws Exception {
+        TestRunner runner = TestRunners.newTestRunner(ConsumeJMS.class);
+
+        // using (non-JNDI) JMS Connection Factory via controller service
+        JMSConnectionFactoryProvider cfProvider = new 
JMSConnectionFactoryProvider();
+        runner.addControllerService("cfProvider", cfProvider);
+        runner.setProperty(cfProvider, 
JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, 
"DummyJMSConnectionFactoryClass");
+        runner.setProperty(cfProvider, 
JMSConnectionFactoryProperties.JMS_BROKER_URI, "DummyBrokerUri");
+        runner.enableControllerService(cfProvider);
+
+        runner.setProperty(ConsumeJMS.CF_SERVICE, "cfProvider");
+        runner.setProperty(ConsumeJMS.DESTINATION, "myTopic");
+        runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.TOPIC);
+
+        try {
+            runner.run();
+            fail("The test was implemented in a way this line should not be 
reached.");
+        } catch (AssertionError e) {
+        } finally {
+            assertTrue("In case of an exception, the processor should be 
yielded.", ((MockProcessContext) runner.getProcessContext()).isYieldCalled());
+        }
+    }
+
     private static void publishAMessage(ActiveMQConnectionFactory cf, final 
String destinationName, String messageContent) throws JMSException {
         // Publish a message.
         try (Connection conn = cf.createConnection();
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
index 247a3ac..b901b73 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -31,10 +32,12 @@ import org.apache.activemq.transport.tcp.TcpTransport;
 import org.apache.activemq.transport.tcp.TcpTransportFactory;
 import org.apache.activemq.wireformat.WireFormat;
 import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition;
+import org.apache.nifi.jms.cf.JndiJmsConnectionFactoryProperties;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockProcessContext;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Test;
@@ -381,4 +384,26 @@ public class PublishJMSIT {
             }
         }
     }
+
+    @Test
+    public void 
whenExceptionIsRaisedDuringConnectionFactoryInitializationTheProcessorShouldBeYielded()
 throws Exception {
+        TestRunner runner = TestRunners.newTestRunner(PublishJMS.class);
+
+        // using JNDI JMS Connection Factory configured locally on the 
processor
+        
runner.setProperty(JndiJmsConnectionFactoryProperties.JNDI_INITIAL_CONTEXT_FACTORY,
 "DummyInitialContextFactoryClass");
+        
runner.setProperty(JndiJmsConnectionFactoryProperties.JNDI_PROVIDER_URL, 
"DummyProviderUrl");
+        
runner.setProperty(JndiJmsConnectionFactoryProperties.JNDI_CONNECTION_FACTORY_NAME,
 "DummyConnectionFactoryName");
+
+        runner.setProperty(ConsumeJMS.DESTINATION, "myTopic");
+        runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.TOPIC);
+
+        try {
+            runner.enqueue("message");
+            runner.run();
+            fail("The test was implemented in a way this line should not be 
reached.");
+        } catch (AssertionError e) {
+        } finally {
+            assertTrue("In case of an exception, the processor should be 
yielded.", ((MockProcessContext) runner.getProcessContext()).isYieldCalled());
+        }
+    }
 }

Reply via email to