This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 82b4fb0 NIFI-6957 - Added REGEX header property, and option to allow
illegal chars in header names
82b4fb0 is described below
commit 82b4fb06338aa1019f70950b0d467352d36d66b0
Author: r65535 <[email protected]>
AuthorDate: Wed Dec 18 10:07:55 2019 +0000
NIFI-6957 - Added REGEX header property, and option to allow illegal chars
in header names
Signed-off-by: Pierre Villard <[email protected]>
This closes #3942.
---
.../nifi/jms/processors/AbstractJMSProcessor.java | 21 ++++++++++
.../org/apache/nifi/jms/processors/ConsumeJMS.java | 2 +
.../apache/nifi/jms/processors/JMSPublisher.java | 7 +---
.../org/apache/nifi/jms/processors/PublishJMS.java | 31 +++++++++++++-
.../jms/processors/JMSPublisherConsumerIT.java | 9 ++--
.../apache/nifi/jms/processors/PublishJMSIT.java | 49 ++++++++++++++++++++++
6 files changed, 106 insertions(+), 13 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
index e51238d..e7a29d3 100644
---
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
@@ -118,6 +118,25 @@ abstract class AbstractJMSProcessor<T extends JMSWorker>
extends AbstractProcess
.defaultValue(Charset.defaultCharset().name())
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
+ static final PropertyDescriptor ALLOW_ILLEGAL_HEADER_CHARS = new
PropertyDescriptor.Builder()
+ .name("allow-illegal-chars-in-jms-header-names")
+ .displayName("Allow Illegal Characters in Header Names")
+ .description("Specifies whether illegal characters in header names
should be sent to the JMS broker. " +
+ "Usually hyphens and full-stops.")
+ .required(true)
+ .defaultValue("false")
+ .allowableValues("true", "false")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor ATTRIBUTES_AS_HEADERS_REGEX = new
PropertyDescriptor.Builder()
+ .name("attributes-to-send-as-jms-headers-regex")
+ .displayName("Attributes to Send as JMS Headers (Regex)")
+ .description("Specifies the Regular Expression that determines the
names of FlowFile attributes that" +
+ " should be sent as JMS Headers")
+ .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+ .defaultValue(".*")
+ .required(true)
+ .build();
static final PropertyDescriptor CF_SERVICE = new
PropertyDescriptor.Builder()
@@ -141,6 +160,8 @@ abstract class AbstractJMSProcessor<T extends JMSWorker>
extends AbstractProcess
propertyDescriptors.add(SESSION_CACHE_SIZE);
propertyDescriptors.add(MESSAGE_BODY);
propertyDescriptors.add(CHARSET);
+ propertyDescriptors.add(ALLOW_ILLEGAL_HEADER_CHARS);
+ propertyDescriptors.add(ATTRIBUTES_AS_HEADERS_REGEX);
}
@Override
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
index 4b149e2..eb4597c 100644
---
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
@@ -151,6 +151,8 @@ public class ConsumeJMS extends
AbstractJMSProcessor<JMSConsumer> {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.addAll(propertyDescriptors);
_propertyDescriptors.remove(MESSAGE_BODY);
+ _propertyDescriptors.remove(ALLOW_ILLEGAL_HEADER_CHARS);
+ _propertyDescriptors.remove(ATTRIBUTES_AS_HEADERS_REGEX);
// change the validator on CHARSET property
_propertyDescriptors.remove(CHARSET);
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
index 1ea61b6..17d5690 100644
---
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
@@ -35,7 +35,6 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.stream.Collectors;
/**
* Generic publisher of messages to JMS compliant messaging system.
@@ -77,11 +76,7 @@ final class JMSPublisher extends JMSWorker {
void setMessageHeaderAndProperties(final Message message, final
Map<String, String> flowFileAttributes) throws JMSException {
if (flowFileAttributes != null && !flowFileAttributes.isEmpty()) {
- Map<String, String> flowFileAttributesToSend =
flowFileAttributes.entrySet().stream()
- .filter(entry -> !entry.getKey().contains("-") &&
!entry.getKey().contains(".")) // '-' and '.' are illegal chars in JMS property
names
- .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
-
- for (Entry<String, String> entry :
flowFileAttributesToSend.entrySet()) {
+ for (Entry<String, String> entry : flowFileAttributes.entrySet()) {
try {
if (entry.getKey().equals(JmsHeaders.DELIVERY_MODE)) {
this.jmsTemplate.setDeliveryMode(Integer.parseInt(entry.getValue()));
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
index 12451cf..c95ec9d 100644
---
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
@@ -43,8 +43,11 @@ import javax.jms.Message;
import java.io.StringWriter;
import java.nio.charset.Charset;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
+import java.util.regex.Pattern;
/**
* An implementation of JMS Message publishing {@link Processor} which upon
each
@@ -121,10 +124,34 @@ public class PublishJMS extends
AbstractJMSProcessor<JMSPublisher> {
try {
String destinationName =
context.getProperty(DESTINATION).evaluateAttributeExpressions(flowFile).getValue();
String charset =
context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue();
+ Boolean allowIllegalChars =
context.getProperty(ALLOW_ILLEGAL_HEADER_CHARS).asBoolean();
+ String attributeHeaderRegex =
context.getProperty(ATTRIBUTES_AS_HEADERS_REGEX).getValue();
+
+ Map<String,String> attributesToSend = new HashMap<>();
+ // REGEX Attributes
+ final Pattern pattern = Pattern.compile(attributeHeaderRegex);
+ for (final Map.Entry<String, String> entry :
flowFile.getAttributes().entrySet()) {
+ final String key = entry.getKey();
+ if (pattern.matcher(key).matches()) {
+ attributesToSend.put(key, flowFile.getAttribute(key));
+ }
+ }
+
+ // Optionally remove illegal headers names apart from .type
attributes for JMS variable types
+ if (!allowIllegalChars) {
+ for (final Map.Entry<String,String> entry :
attributesToSend.entrySet()) {
+ if (!entry.getKey().endsWith(".type")){
+ if (entry.getKey().contains("-") ||
entry.getKey().contains(".")) {
+ attributesToSend.remove(entry.getKey());
+ }
+ }
+ }
+ }
+
switch (context.getProperty(MESSAGE_BODY).getValue()) {
case TEXT_MESSAGE:
try {
- publisher.publish(destinationName,
this.extractTextMessageBody(flowFile, processSession, charset),
flowFile.getAttributes());
+ publisher.publish(destinationName,
this.extractTextMessageBody(flowFile, processSession, charset),
attributesToSend);
} catch(Exception e) {
publisher.setValid(false);
throw e;
@@ -133,7 +160,7 @@ public class PublishJMS extends
AbstractJMSProcessor<JMSPublisher> {
case BYTES_MESSAGE:
default:
try {
- publisher.publish(destinationName,
this.extractMessageBody(flowFile, processSession), flowFile.getAttributes());
+ publisher.publish(destinationName,
this.extractMessageBody(flowFile, processSession), attributesToSend);
} catch(Exception e) {
publisher.setValid(false);
throw e;
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
index 12474ec..bc480a2 100644
---
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
@@ -17,7 +17,6 @@
package org.apache.nifi.jms.processors;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
@@ -74,8 +73,8 @@ public class JMSPublisherConsumerIT {
JMSPublisher publisher = new
JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(),
jmsTemplate, mock(ComponentLog.class));
Map<String, String> flowFileAttributes = new HashMap<>();
flowFileAttributes.put("foo", "foo");
- flowFileAttributes.put("illegal-property", "value");
- flowFileAttributes.put("another.illegal", "value");
+ flowFileAttributes.put("hyphen-property", "value");
+ flowFileAttributes.put("fullstop.property", "value");
flowFileAttributes.put(JmsHeaders.REPLY_TO, "myTopic");
flowFileAttributes.put(JmsHeaders.DELIVERY_MODE, "1");
flowFileAttributes.put(JmsHeaders.PRIORITY, "1");
@@ -85,8 +84,8 @@ public class JMSPublisherConsumerIT {
Message receivedMessage = jmsTemplate.receive(destinationName);
assertTrue(receivedMessage instanceof BytesMessage);
assertEquals("foo", receivedMessage.getStringProperty("foo"));
- assertFalse(receivedMessage.propertyExists("illegal-property"));
- assertFalse(receivedMessage.propertyExists("another.illegal"));
+ assertTrue(receivedMessage.propertyExists("hyphen-property"));
+ assertTrue(receivedMessage.propertyExists("fullstop.property"));
assertTrue(receivedMessage.getJMSReplyTo() instanceof Topic);
assertEquals(1, receivedMessage.getJMSDeliveryMode());
assertEquals(1, receivedMessage.getJMSPriority());
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
index fa0bd7a..ad3febd 100644
---
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
@@ -36,6 +36,7 @@ import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -62,6 +63,7 @@ public class PublishJMSIT {
Map<String, String> attributes = new HashMap<>();
attributes.put("foo", "foo");
attributes.put(JmsHeaders.REPLY_TO, "cooQueue");
+ attributes.put("test-attribute", "value");
runner.enqueue("Hey dude!".getBytes(), attributes);
runner.run(1, false); // Run once but don't shut down because we want
the Connection Factory left in tact so that we can use it.
@@ -75,6 +77,7 @@ public class PublishJMSIT {
assertEquals("Hey dude!", new String(messageBytes));
assertEquals("cooQueue", ((Queue)
message.getJMSReplyTo()).getQueueName());
assertEquals("foo", message.getStringProperty("foo"));
+ assertNull(message.getStringProperty("test-attribute"));
runner.run(1, true, false); // Run once just so that we can trigger
the shutdown of the Connection Factory
}
@@ -253,4 +256,50 @@ public class PublishJMSIT {
runner.run(1, true, false); // Run once just so that we can trigger
the shutdown of the Connection Factory
}
+
+ @Test(timeout = 10000)
+ public void validateRegexAndIllegalHeaders() throws Exception {
+ ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+
+ final String destinationName = "validatePublishTextMessage";
+ PublishJMS pubProc = new PublishJMS();
+ TestRunner runner = TestRunners.newTestRunner(pubProc);
+ JMSConnectionFactoryProviderDefinition cs =
mock(JMSConnectionFactoryProviderDefinition.class);
+ when(cs.getIdentifier()).thenReturn("cfProvider");
+ when(cs.getConnectionFactory()).thenReturn(cf);
+
+ runner.addControllerService("cfProvider", cs);
+ runner.enableControllerService(cs);
+
+ runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
+ runner.setProperty(PublishJMS.DESTINATION, destinationName);
+ runner.setProperty(PublishJMS.MESSAGE_BODY, "text");
+ runner.setProperty(PublishJMS.ATTRIBUTES_AS_HEADERS_REGEX,
"^((?!bar).)*$");
+ runner.setProperty(PublishJMS.ALLOW_ILLEGAL_HEADER_CHARS, "true");
+
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put("foo", "foo");
+ attributes.put("bar", "bar");
+ attributes.put("test-header-with-hyphen", "value");
+ attributes.put(JmsHeaders.REPLY_TO, "cooQueue");
+ runner.enqueue("Hey dude!".getBytes(), attributes);
+ runner.run(1, false);
+
+ final MockFlowFile successFF =
runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
+ assertNotNull(successFF);
+
+ JmsTemplate jmst = new JmsTemplate(cf);
+ Message message = jmst.receive(destinationName);
+ assertTrue(message instanceof TextMessage);
+ TextMessage textMessage = (TextMessage) message;
+
+ byte[] messageBytes = MessageBodyToBytesConverter.toBytes(textMessage);
+ assertEquals("Hey dude!", new String(messageBytes));
+ assertEquals("cooQueue", ((Queue)
message.getJMSReplyTo()).getQueueName());
+ assertEquals("foo", message.getStringProperty("foo"));
+ assertEquals("value",
message.getStringProperty("test-header-with-hyphen"));
+ assertNull(message.getStringProperty("bar"));
+
+ runner.run(1, true, false); // Run once just so that we can trigger
the shutdown of the Connection Factory
+ }
}