This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new c1301e1 NIFI-6915 This closes #3961. Jms Durable non shared
subscription is broken Revert NIFI-4834 enhancement for durable non shared
consumers only.
c1301e1 is described below
commit c1301e196c5be89819e702ec76912f3c426f3055
Author: Gardella Juan Pablo <[email protected]>
AuthorDate: Tue Jan 7 03:42:43 2020 -0300
NIFI-6915 This closes #3961. Jms Durable non shared subscription is broken
Revert NIFI-4834 enhancement for durable non shared consumers only.
Updated also AbstractJMSProcessor class to be public. The testing are not
working
without that change, as org.apache.nifi.jms.processors.PublishJMSIT and
org.apache.nifi.jms.processors.ConsumeJMSIT are not working, as @OnSchedule
method is not called (because it is not public).
The method org.apache.nifi.util.StandardProcessorTestRunner.run(int
iterations, boolean stopOnFinish, boolean initialize, long runWait) uses
ReflectionUtils.invokeMethodsWithAnnotation which does not call non public
methods.
Signed-off-by: Joe Witt <[email protected]>
---
.../nifi/jms/processors/AbstractJMSProcessor.java | 31 ++++--
.../org/apache/nifi/jms/processors/ConsumeJMS.java | 47 +++++++-
.../apache/nifi/jms/processors/ConsumeJMSIT.java | 121 ++++++++++++++++++++-
3 files changed, 183 insertions(+), 16 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
index 82ed075..33cc87c 100644
---
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
@@ -29,6 +29,7 @@ import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.springframework.jms.connection.CachingConnectionFactory;
+import org.springframework.jms.connection.SingleConnectionFactory;
import
org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
import org.springframework.jms.core.JmsTemplate;
@@ -49,7 +50,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* @see ConsumeJMS
* @see JMSConnectionFactoryProviderDefinition
*/
-abstract class AbstractJMSProcessor<T extends JMSWorker> extends
AbstractProcessor {
+public abstract class AbstractJMSProcessor<T extends JMSWorker> extends
AbstractProcessor {
static final String QUEUE = "QUEUE";
static final String TOPIC = "TOPIC";
@@ -164,6 +165,10 @@ abstract class AbstractJMSProcessor<T extends JMSWorker>
extends AbstractProcess
propertyDescriptors.add(ATTRIBUTES_AS_HEADERS_REGEX);
}
+ protected static String getClientId(ProcessContext context) {
+ return
context.getProperty(CLIENT_ID).evaluateAttributeExpressions().getValue();
+ }
+
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
@@ -258,12 +263,7 @@ abstract class AbstractJMSProcessor<T extends JMSWorker>
extends AbstractProcess
cfCredentialsAdapter.setPassword(context.getProperty(PASSWORD).getValue());
final CachingConnectionFactory cachingFactory = new
CachingConnectionFactory(cfCredentialsAdapter);
-
- String clientId =
context.getProperty(CLIENT_ID).evaluateAttributeExpressions().getValue();
- if (clientId != null) {
- clientId = clientId + "-" + clientIdCounter.getAndIncrement();
- cachingFactory.setClientId(clientId);
- }
+ setClientId(context, cachingFactory);
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(cachingFactory);
@@ -271,4 +271,21 @@ abstract class AbstractJMSProcessor<T extends JMSWorker>
extends AbstractProcess
return finishBuildingJmsWorker(cachingFactory, jmsTemplate, context);
}
+
+ /**
+ * Set clientId for JMS connections when <tt>clientId</tt> is not null.
+ * It is overridden by {@code}ConsumeJMS{@code} when durable subscriptions
+ * is configured on the processor.
+ * @param context context.
+ * @param connectionFactory the connection factory.
+ * @since NIFI-6915
+ */
+ protected void setClientId(ProcessContext context, final
SingleConnectionFactory connectionFactory) {
+ String clientId = getClientId(context);
+ if (clientId != null) {
+ clientId = clientId + "-" + clientIdCounter.getAndIncrement();
+ connectionFactory.setClientId(clientId);
+ }
+ }
+
}
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
index fac995c..3e278bc 100644
---
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
@@ -23,6 +23,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
@@ -38,6 +39,7 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.springframework.jms.connection.CachingConnectionFactory;
+import org.springframework.jms.connection.SingleConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.JmsHeaders;
@@ -188,6 +190,23 @@ public class ConsumeJMS extends
AbstractJMSProcessor<JMSConsumer> {
relationships = Collections.unmodifiableSet(_relationships);
}
+ private static boolean isDurableSubscriber(final ProcessContext context) {
+ final Boolean durableBoolean =
context.getProperty(DURABLE_SUBSCRIBER).evaluateAttributeExpressions().asBoolean();
+ return durableBoolean == null ? false : durableBoolean;
+ }
+
+ private static boolean isShared(final ProcessContext context) {
+ final Boolean sharedBoolean =
context.getProperty(SHARED_SUBSCRIBER).evaluateAttributeExpressions().asBoolean();
+ return sharedBoolean == null ? false : sharedBoolean;
+ }
+
+ @OnScheduled
+ public void onSchedule(ProcessContext context) {
+ if (context.getMaxConcurrentTasks() > 1 &&
isDurableSubscriber(context) && !isShared(context)) {
+ throw new ProcessException("Durable non shared subscriptions
cannot work on multiple threads. Check javax/jms/Session#createDurableConsumer
API doc.");
+ }
+ }
+
@Override
protected Collection<ValidationResult> customValidate(ValidationContext
validationContext) {
final List<ValidationResult> validationResults = new
ArrayList<>(super.customValidate(validationContext));
@@ -203,7 +222,6 @@ public class ConsumeJMS extends
AbstractJMSProcessor<JMSConsumer> {
"'" + DESTINATION_TYPE.getDisplayName() + "'='" + QUEUE +
"'")
.build());
}
-
return validationResults;
}
@@ -218,10 +236,8 @@ public class ConsumeJMS extends
AbstractJMSProcessor<JMSConsumer> {
protected void rendezvousWithJms(final ProcessContext context, final
ProcessSession processSession, final JMSConsumer consumer) throws
ProcessException {
final String destinationName =
context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue();
final String errorQueueName =
context.getProperty(ERROR_QUEUE).evaluateAttributeExpressions().getValue();
- final Boolean durableBoolean =
context.getProperty(DURABLE_SUBSCRIBER).evaluateAttributeExpressions().asBoolean();
- final boolean durable = durableBoolean == null ? false :
durableBoolean;
- final Boolean sharedBoolean =
context.getProperty(SHARED_SUBSCRIBER).evaluateAttributeExpressions().asBoolean();
- final boolean shared = sharedBoolean == null ? false : sharedBoolean;
+ final boolean durable = isDurableSubscriber(context);
+ final boolean shared = isShared(context);
final String subscriptionName =
context.getProperty(SUBSCRIPTION_NAME).evaluateAttributeExpressions().getValue();
final String charset =
context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
@@ -280,6 +296,27 @@ public class ConsumeJMS extends
AbstractJMSProcessor<JMSConsumer> {
}
/**
+ * <p>
+ * Use provided clientId for non shared durable consumers, if not set
+ * always a different value as defined in {@link
AbstractJMSProcessor#setClientId(ProcessContext, SingleConnectionFactory)}.
+ * </p>
+ * See {@link Session#createDurableConsumer(javax.jms.Topic, String,
String, boolean)},
+ * in special following part: <i>An unshared durable subscription is
+ * identified by a name specified by the client and by the client
identifier,
+ * which must be set. An application which subsequently wishes to create
+ * a consumer on that unshared durable subscription must use the same
+ * client identifier.</i>
+ */
+ @Override
+ protected void setClientId(ProcessContext context, SingleConnectionFactory
cachingFactory) {
+ if (isDurableSubscriber(context) && !isShared(context)) {
+ cachingFactory.setClientId(getClientId(context));
+ } else {
+ super.setClientId(context, cachingFactory);
+ }
+ }
+
+ /**
* Copies JMS attributes (i.e., headers and properties) as FF attributes.
* Given that FF attributes mandate that values are of type String, the
* copied values of JMS attributes will be "stringified" via
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
index f40f60c..e5ca276 100644
---
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
@@ -18,16 +18,17 @@ package org.apache.nifi.jms.processors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import java.util.HashMap;
-import java.util.Map;
-
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -37,10 +38,16 @@ import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.support.JmsHeaders;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import javax.jms.BytesMessage;
+import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
+import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
@@ -50,7 +57,7 @@ public class ConsumeJMSIT {
@Test
public void validateSuccessfulConsumeAndTransferToSuccess() throws
Exception {
- final String destinationName = "cooQueue";
+ final String destinationName = "cooQueue";
JmsTemplate jmsTemplate =
CommonTest.buildJmsTemplateForDestination(false);
try {
JMSPublisher sender = new JMSPublisher((CachingConnectionFactory)
jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
@@ -258,4 +265,110 @@ public class ConsumeJMSIT {
return message;
}
+
+ /**
+ * Validates <a
href="https://issues.apache.org/jira/browse/NIFI-6915">NIFI-6915</a>.
+ * <p>
+ * The test consists on:
+ * <ul>
+ * <li>Start a durable non shared consumer <tt>C1</tt> with client id
<tt>client1</tt> subscribed to topic <tt>T</tt>.</li>
+ * <li>Stop <tt>C1</tt>.</li>
+ * <li>Publish a message <tt>M1</tt> to topic <tt>T</tt>.</li>
+ * <li>Start <tt>C1</tt>.</li>
+ * </ul>
+ * It is expected <tt>C1</tt> receives message <tt>M1</tt>.
+ * </p>
+ * @throws Exception unexpected
+ */
+ @Test(timeout = 10000)
+ public void validateNifi6915() throws Exception {
+ BrokerService broker = new BrokerService();
+ try {
+ broker.setPersistent(false);
+ broker.setBrokerName("broker1");
+ broker.start();
+ ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("vm://broker1");
+ final String destinationName = "validateNifi6915";
+
+ TestRunner c1Consumer = createNonSharedDurableConsumer(cf,
destinationName);
+ // 1. Start a durable non shared consumer C1 with client id
client1 subscribed to topic T.
+ boolean stopConsumer = true;
+ c1Consumer.run(1, stopConsumer);
+ List<MockFlowFile> flowFiles =
c1Consumer.getFlowFilesForRelationship(ConsumeJMS.REL_SUCCESS);
+ assertTrue("Expected no messages", flowFiles.isEmpty());
+ // 2. Publish a message M1 to topic T.
+ publishAMessage(cf, destinationName, "Hi buddy!!");
+ // 3. Start C1.
+ c1Consumer.run(1, true);
+ flowFiles =
c1Consumer.getFlowFilesForRelationship(ConsumeJMS.REL_SUCCESS);
+ assertEquals(1, flowFiles.size());
+
+ // It is expected C1 receives message M1.
+ final MockFlowFile successFF = flowFiles.get(0);
+ assertNotNull(successFF);
+ successFF.assertAttributeExists(JmsHeaders.DESTINATION);
+ successFF.assertAttributeEquals(JmsHeaders.DESTINATION,
destinationName);
+ successFF.assertContentEquals("Hi buddy!!".getBytes());
+ assertEquals(destinationName,
successFF.getAttribute(ConsumeJMS.JMS_SOURCE_DESTINATION_NAME));
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ if (broker != null) {
+ broker.stop();
+ }
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void validateNifi6915OnlyOneThreadAllowed() {
+ ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+ final String destinationName = "validateNifi6915";
+ try {
+ TestRunner runner = createNonSharedDurableConsumer(cf,
destinationName);
+ runner.setThreadCount(2);
+ runner.run(1, true);
+ fail();
+ } catch (Throwable e) {
+ // Unable to capture the message :(
+ }
+
+ TestRunner runner = createNonSharedDurableConsumer(cf,
destinationName);
+ // using one thread, it should not fail.
+ runner.setThreadCount(1);
+ runner.run(1, true);
+ }
+
+ private static void publishAMessage(ActiveMQConnectionFactory cf, final
String destinationName, String messageContent) throws JMSException {
+ // Publish a message.
+ try (Connection conn = cf.createConnection();
+ Session session = conn.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer =
session.createProducer(session.createTopic(destinationName))) {
+ producer.send(session.createTextMessage(messageContent));
+ }
+ }
+
+ private static TestRunner
createNonSharedDurableConsumer(ActiveMQConnectionFactory cf, final String
destinationName) {
+ ConsumeJMS c1 = new ConsumeJMS();
+ TestRunner c1Consumer = TestRunners.newTestRunner(c1);
+ JMSConnectionFactoryProviderDefinition cs =
mock(JMSConnectionFactoryProviderDefinition.class);
+ when(cs.getIdentifier()).thenReturn("cfProvider");
+ when(cs.getConnectionFactory()).thenReturn(cf);
+
+ try {
+ c1Consumer.addControllerService("cfProvider", cs);
+ } catch (InitializationException e) {
+ throw new IllegalStateException(e);
+ }
+ c1Consumer.enableControllerService(cs);
+
+ c1Consumer.setProperty(ConsumeJMS.CF_SERVICE, "cfProvider");
+ c1Consumer.setProperty(ConsumeJMS.DESTINATION, destinationName);
+ c1Consumer.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.TOPIC);
+ c1Consumer.setProperty(ConsumeJMS.DURABLE_SUBSCRIBER, "true");
+ c1Consumer.setProperty(ConsumeJMS.SUBSCRIPTION_NAME,
"SubscriptionName");
+ c1Consumer.setProperty(ConsumeJMS.SHARED_SUBSCRIBER, "false");
+ c1Consumer.setProperty(ConsumeJMS.CLIENT_ID, "client1");
+ return c1Consumer;
+ }
+
}