This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch support/nifi-1.11.x in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 4412460331ae3e751f4b699ec82f8bd292b9d220 Author: Gardella Juan Pablo <[email protected]> AuthorDate: Tue Jan 21 17:07:53 2020 -0300 NIFI-7050 ConsumeJMS is not yielded in case of exception This closes #4004. Signed-off-by: Peter Turcsanyi <[email protected]> --- .../org/apache/nifi/jms/processors/ConsumeJMS.java | 1 + .../apache/nifi/jms/processors/ConsumeJMSIT.java | 25 ++++++++++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java index 80f9457..357e2f9 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java @@ -280,6 +280,7 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> { }); } catch(Exception e) { consumer.setValid(false); + context.yield(); throw e; // for backward compatibility with exception handling in flows } } diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java index 085db0e..714b950 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java @@ -38,6 +38,7 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.MockProcessContext; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Test; @@ -393,6 +394,30 @@ public class ConsumeJMSIT { } } + @Test(timeout = 10000) + public void whenExceptionIsRaisedTheProcessorShouldBeYielded() throws Exception { + TestRunner runner = TestRunners.newTestRunner(new ConsumeJMS()); + JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class); + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://invalidhost:9999?soTimeout=3"); + + when(cs.getIdentifier()).thenReturn("cfProvider"); + when(cs.getConnectionFactory()).thenReturn(cf); + runner.addControllerService("cfProvider", cs); + runner.enableControllerService(cs); + + runner.setProperty(ConsumeJMS.CF_SERVICE, "cfProvider"); + runner.setProperty(ConsumeJMS.DESTINATION, "foo"); + runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.TOPIC); + + try { + runner.run(); + fail("The test was implemented in a way this line should not be reached."); + } catch (AssertionError e) { + } finally { + assertTrue("In case of an exception, the processor should be yielded.", ((MockProcessContext) runner.getProcessContext()).isYieldCalled()); + } + } + private static void publishAMessage(ActiveMQConnectionFactory cf, final String destinationName, String messageContent) throws JMSException { // Publish a message. try (Connection conn = cf.createConnection();
