This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 82b4fb0  NIFI-6957 - Added REGEX header property, and option to allow 
illegal chars in header names
82b4fb0 is described below

commit 82b4fb06338aa1019f70950b0d467352d36d66b0
Author: r65535 <[email protected]>
AuthorDate: Wed Dec 18 10:07:55 2019 +0000

    NIFI-6957 - Added REGEX header property, and option to allow illegal chars 
in header names
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #3942.
---
 .../nifi/jms/processors/AbstractJMSProcessor.java  | 21 ++++++++++
 .../org/apache/nifi/jms/processors/ConsumeJMS.java |  2 +
 .../apache/nifi/jms/processors/JMSPublisher.java   |  7 +---
 .../org/apache/nifi/jms/processors/PublishJMS.java | 31 +++++++++++++-
 .../jms/processors/JMSPublisherConsumerIT.java     |  9 ++--
 .../apache/nifi/jms/processors/PublishJMSIT.java   | 49 ++++++++++++++++++++++
 6 files changed, 106 insertions(+), 13 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
index e51238d..e7a29d3 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
@@ -118,6 +118,25 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> 
extends AbstractProcess
             .defaultValue(Charset.defaultCharset().name())
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .build();
+    static final PropertyDescriptor ALLOW_ILLEGAL_HEADER_CHARS = new 
PropertyDescriptor.Builder()
+            .name("allow-illegal-chars-in-jms-header-names")
+            .displayName("Allow Illegal Characters in Header Names")
+            .description("Specifies whether illegal characters in header names 
should be sent to the JMS broker. " +
+                    "Usually hyphens and full-stops.")
+            .required(true)
+            .defaultValue("false")
+            .allowableValues("true", "false")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor ATTRIBUTES_AS_HEADERS_REGEX = new 
PropertyDescriptor.Builder()
+            .name("attributes-to-send-as-jms-headers-regex")
+            .displayName("Attributes to Send as JMS Headers (Regex)")
+            .description("Specifies the Regular Expression that determines the 
names of FlowFile attributes that" +
+                    " should be sent as JMS Headers")
+            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+            .defaultValue(".*")
+            .required(true)
+            .build();
 
 
     static final PropertyDescriptor CF_SERVICE = new 
PropertyDescriptor.Builder()
@@ -141,6 +160,8 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> 
extends AbstractProcess
         propertyDescriptors.add(SESSION_CACHE_SIZE);
         propertyDescriptors.add(MESSAGE_BODY);
         propertyDescriptors.add(CHARSET);
+        propertyDescriptors.add(ALLOW_ILLEGAL_HEADER_CHARS);
+        propertyDescriptors.add(ATTRIBUTES_AS_HEADERS_REGEX);
     }
 
     @Override
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
index 4b149e2..eb4597c 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
@@ -151,6 +151,8 @@ public class ConsumeJMS extends 
AbstractJMSProcessor<JMSConsumer> {
         List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
         _propertyDescriptors.addAll(propertyDescriptors);
         _propertyDescriptors.remove(MESSAGE_BODY);
+        _propertyDescriptors.remove(ALLOW_ILLEGAL_HEADER_CHARS);
+        _propertyDescriptors.remove(ATTRIBUTES_AS_HEADERS_REGEX);
 
         // change the validator on CHARSET property
         _propertyDescriptors.remove(CHARSET);
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
index 1ea61b6..17d5690 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
@@ -35,7 +35,6 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.stream.Collectors;
 
 /**
  * Generic publisher of messages to JMS compliant messaging system.
@@ -77,11 +76,7 @@ final class JMSPublisher extends JMSWorker {
     void setMessageHeaderAndProperties(final Message message, final 
Map<String, String> flowFileAttributes) throws JMSException {
         if (flowFileAttributes != null && !flowFileAttributes.isEmpty()) {
 
-            Map<String, String> flowFileAttributesToSend = 
flowFileAttributes.entrySet().stream()
-                    .filter(entry -> !entry.getKey().contains("-") && 
!entry.getKey().contains(".")) // '-' and '.' are illegal chars in JMS property 
names
-                    .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
-
-            for (Entry<String, String> entry : 
flowFileAttributesToSend.entrySet()) {
+            for (Entry<String, String> entry : flowFileAttributes.entrySet()) {
                 try {
                     if (entry.getKey().equals(JmsHeaders.DELIVERY_MODE)) {
                         
this.jmsTemplate.setDeliveryMode(Integer.parseInt(entry.getValue()));
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
index 12451cf..c95ec9d 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
@@ -43,8 +43,11 @@ import javax.jms.Message;
 import java.io.StringWriter;
 import java.nio.charset.Charset;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
+import java.util.regex.Pattern;
 
 /**
  * An implementation of JMS Message publishing {@link Processor} which upon 
each
@@ -121,10 +124,34 @@ public class PublishJMS extends 
AbstractJMSProcessor<JMSPublisher> {
             try {
                 String destinationName = 
context.getProperty(DESTINATION).evaluateAttributeExpressions(flowFile).getValue();
                 String charset = 
context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue();
+                Boolean allowIllegalChars = 
context.getProperty(ALLOW_ILLEGAL_HEADER_CHARS).asBoolean();
+                String attributeHeaderRegex = 
context.getProperty(ATTRIBUTES_AS_HEADERS_REGEX).getValue();
+
+                Map<String,String> attributesToSend = new HashMap<>();
+                // REGEX Attributes
+                final Pattern pattern = Pattern.compile(attributeHeaderRegex);
+                for (final Map.Entry<String, String> entry : 
flowFile.getAttributes().entrySet()) {
+                    final String key = entry.getKey();
+                    if (pattern.matcher(key).matches()) {
+                        attributesToSend.put(key, flowFile.getAttribute(key));
+                    }
+                }
+
+                // Optionally remove illegal headers names apart from .type 
attributes for JMS variable types
+                if (!allowIllegalChars) {
+                    for (final Map.Entry<String,String> entry : 
attributesToSend.entrySet()) {
+                        if (!entry.getKey().endsWith(".type")){
+                            if (entry.getKey().contains("-") || 
entry.getKey().contains(".")) {
+                                attributesToSend.remove(entry.getKey());
+                            }
+                        }
+                    }
+                }
+
                 switch (context.getProperty(MESSAGE_BODY).getValue()) {
                     case TEXT_MESSAGE:
                         try {
-                            publisher.publish(destinationName, 
this.extractTextMessageBody(flowFile, processSession, charset), 
flowFile.getAttributes());
+                            publisher.publish(destinationName, 
this.extractTextMessageBody(flowFile, processSession, charset), 
attributesToSend);
                         } catch(Exception e) {
                             publisher.setValid(false);
                             throw e;
@@ -133,7 +160,7 @@ public class PublishJMS extends 
AbstractJMSProcessor<JMSPublisher> {
                     case BYTES_MESSAGE:
                     default:
                         try {
-                            publisher.publish(destinationName, 
this.extractMessageBody(flowFile, processSession), flowFile.getAttributes());
+                            publisher.publish(destinationName, 
this.extractMessageBody(flowFile, processSession), attributesToSend);
                         } catch(Exception e) {
                             publisher.setValid(false);
                             throw e;
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
index 12474ec..bc480a2 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
@@ -17,7 +17,6 @@
 package org.apache.nifi.jms.processors;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
@@ -74,8 +73,8 @@ public class JMSPublisherConsumerIT {
             JMSPublisher publisher = new 
JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), 
jmsTemplate, mock(ComponentLog.class));
             Map<String, String> flowFileAttributes = new HashMap<>();
             flowFileAttributes.put("foo", "foo");
-            flowFileAttributes.put("illegal-property", "value");
-            flowFileAttributes.put("another.illegal", "value");
+            flowFileAttributes.put("hyphen-property", "value");
+            flowFileAttributes.put("fullstop.property", "value");
             flowFileAttributes.put(JmsHeaders.REPLY_TO, "myTopic");
             flowFileAttributes.put(JmsHeaders.DELIVERY_MODE, "1");
             flowFileAttributes.put(JmsHeaders.PRIORITY, "1");
@@ -85,8 +84,8 @@ public class JMSPublisherConsumerIT {
             Message receivedMessage = jmsTemplate.receive(destinationName);
             assertTrue(receivedMessage instanceof BytesMessage);
             assertEquals("foo", receivedMessage.getStringProperty("foo"));
-            assertFalse(receivedMessage.propertyExists("illegal-property"));
-            assertFalse(receivedMessage.propertyExists("another.illegal"));
+            assertTrue(receivedMessage.propertyExists("hyphen-property"));
+            assertTrue(receivedMessage.propertyExists("fullstop.property"));
             assertTrue(receivedMessage.getJMSReplyTo() instanceof Topic);
             assertEquals(1, receivedMessage.getJMSDeliveryMode());
             assertEquals(1, receivedMessage.getJMSPriority());
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
index fa0bd7a..ad3febd 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
@@ -36,6 +36,7 @@ import java.util.Map;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -62,6 +63,7 @@ public class PublishJMSIT {
         Map<String, String> attributes = new HashMap<>();
         attributes.put("foo", "foo");
         attributes.put(JmsHeaders.REPLY_TO, "cooQueue");
+        attributes.put("test-attribute", "value");
         runner.enqueue("Hey dude!".getBytes(), attributes);
         runner.run(1, false); // Run once but don't shut down because we want 
the Connection Factory left in tact so that we can use it.
 
@@ -75,6 +77,7 @@ public class PublishJMSIT {
         assertEquals("Hey dude!", new String(messageBytes));
         assertEquals("cooQueue", ((Queue) 
message.getJMSReplyTo()).getQueueName());
         assertEquals("foo", message.getStringProperty("foo"));
+        assertNull(message.getStringProperty("test-attribute"));
 
         runner.run(1, true, false); // Run once just so that we can trigger 
the shutdown of the Connection Factory
     }
@@ -253,4 +256,50 @@ public class PublishJMSIT {
 
         runner.run(1, true, false); // Run once just so that we can trigger 
the shutdown of the Connection Factory
     }
+
+    @Test(timeout = 10000)
+    public void validateRegexAndIllegalHeaders() throws Exception {
+        ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+
+        final String destinationName = "validatePublishTextMessage";
+        PublishJMS pubProc = new PublishJMS();
+        TestRunner runner = TestRunners.newTestRunner(pubProc);
+        JMSConnectionFactoryProviderDefinition cs = 
mock(JMSConnectionFactoryProviderDefinition.class);
+        when(cs.getIdentifier()).thenReturn("cfProvider");
+        when(cs.getConnectionFactory()).thenReturn(cf);
+
+        runner.addControllerService("cfProvider", cs);
+        runner.enableControllerService(cs);
+
+        runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
+        runner.setProperty(PublishJMS.DESTINATION, destinationName);
+        runner.setProperty(PublishJMS.MESSAGE_BODY, "text");
+        runner.setProperty(PublishJMS.ATTRIBUTES_AS_HEADERS_REGEX, 
"^((?!bar).)*$");
+        runner.setProperty(PublishJMS.ALLOW_ILLEGAL_HEADER_CHARS, "true");
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("foo", "foo");
+        attributes.put("bar", "bar");
+        attributes.put("test-header-with-hyphen", "value");
+        attributes.put(JmsHeaders.REPLY_TO, "cooQueue");
+        runner.enqueue("Hey dude!".getBytes(), attributes);
+        runner.run(1, false);
+
+        final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
+        assertNotNull(successFF);
+
+        JmsTemplate jmst = new JmsTemplate(cf);
+        Message message = jmst.receive(destinationName);
+        assertTrue(message instanceof TextMessage);
+        TextMessage textMessage = (TextMessage) message;
+
+        byte[] messageBytes = MessageBodyToBytesConverter.toBytes(textMessage);
+        assertEquals("Hey dude!", new String(messageBytes));
+        assertEquals("cooQueue", ((Queue) 
message.getJMSReplyTo()).getQueueName());
+        assertEquals("foo", message.getStringProperty("foo"));
+        assertEquals("value", 
message.getStringProperty("test-header-with-hyphen"));
+        assertNull(message.getStringProperty("bar"));
+
+        runner.run(1, true, false); // Run once just so that we can trigger 
the shutdown of the Connection Factory
+    }
 }

Reply via email to