sedadgn commented on a change in pull request #5458:
URL: https://github.com/apache/nifi/pull/5458#discussion_r757453945



##########
File path: 
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
##########
@@ -240,20 +254,20 @@ private BasicProperties 
extractAmqpPropertiesFromFlowFile(FlowFile flowFile) {
         updateBuilderFromAttribute(flowFile, "userId", builder::userId);
         updateBuilderFromAttribute(flowFile, "appId", builder::appId);
         updateBuilderFromAttribute(flowFile, "clusterId", builder::clusterId);
-        updateBuilderFromAttribute(flowFile, "headers", headers -> 
builder.headers(validateAMQPHeaderProperty(headers)));
+        updateBuilderFromAttribute(flowFile, "headers", headers -> 
builder.headers(validateAMQPHeaderProperty(headers,headerSeparator)));
 
         return builder.build();
     }
 
     /**
      * Will validate if provided amqpPropValue can be converted to a {@link 
Map}.
-     * Should be passed in the format: amqp$headers=key=value,key=value etc.
-     *
+     * Should be passed in the format: amqp$headers=key=value
+     * @param splitValue is used to split for property
      * @param amqpPropValue the value of the property
      * @return {@link Map} if valid otherwise null
      */
-    private Map<String, Object> validateAMQPHeaderProperty(String 
amqpPropValue) {
-        String[] strEntries = amqpPropValue.split(",");
+    private Map<String, Object> validateAMQPHeaderProperty(String 
amqpPropValue,Character splitValue) {
+        String[] strEntries = 
amqpPropValue.split(Pattern.quote(String.valueOf(splitValue)));

Review comment:
       Pattern.quote create the escaped version of the regex representing. 
   For Example '|'. 
   Also you can look at this. 
   
https://stackoverflow.com/questions/10796160/splitting-a-java-string-by-the-pipe-symbol-using-split

##########
File path: 
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
##########
@@ -161,6 +165,138 @@ public void 
validateSuccessfulConsumeAndTransferToSuccess() throws Exception {
         }
     }
 
+    @Test
+    public void 
validateHeaderWithValueSeparatorForHeaderParameterConsumeAndTransferToSuccess() 
throws Exception {
+        final Map<String, List<String>> routingMap = 
Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+        final Map<String, String> exchangeToRoutingKeymap = 
Collections.singletonMap("myExchange", "key1");
+        final Map<String, Object> headersMap = new HashMap<>();
+        headersMap.put("foo1","bar,bar");
+        headersMap.put("foo2","bar,bar");
+
+        AMQP.BasicProperties.Builder builderBasicProperties = new 
AMQP.BasicProperties.Builder();
+        builderBasicProperties.headers(headersMap);
+
+        final Connection connection = new 
TestConnection(exchangeToRoutingKeymap, routingMap);
+
+        try (AMQPPublisher sender = new AMQPPublisher(connection, 
mock(ComponentLog.class))) {
+            sender.publish("hello".getBytes(), builderBasicProperties.build(), 
"key1", "myExchange");
+
+            ConsumeAMQP proc = new LocalConsumeAMQP(connection);
+            TestRunner runner = initTestRunner(proc);
+            runner.setProperty(ConsumeAMQP.HEADER_SEPARATOR, "|");
+            runner.run();
+            final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+            assertNotNull(successFF);
+            successFF.assertAttributeEquals("amqp$routingKey", "key1");
+            successFF.assertAttributeEquals("amqp$exchange", "myExchange");
+            String headers = successFF.getAttribute("amqp$headers");
+            Map<String, String> properties = 
Splitter.on("|").withKeyValueSeparator("=").split(headers.substring(1,headers.length()-1));
+            Assert.assertEquals(headersMap,properties);
+
+        }
+    }
+    @Test
+    public void validateWithNotValidHeaderSeparatorParameter() {
+        final Map<String, List<String>> routingMap = 
Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+        final Map<String, String> exchangeToRoutingKeymap = 
Collections.singletonMap("myExchange", "key1");
+        final Connection connection = new 
TestConnection(exchangeToRoutingKeymap, routingMap);
+        ConsumeAMQP proc = new LocalConsumeAMQP(connection);
+        TestRunner runner = initTestRunner(proc);
+        runner.setProperty(ConsumeAMQP.HEADER_SEPARATOR, "|,");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void 
validateHeaderWithRemoveCurlyBracesParameterConsumeAndTransferToSuccess() 
throws Exception {
+        final Map<String, List<String>> routingMap = 
Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+        final Map<String, String> exchangeToRoutingKeymap = 
Collections.singletonMap("myExchange", "key1");
+        final Map<String, Object> headersMap = new HashMap<>();
+        headersMap.put("key1","(bar,bar)");
+        AMQP.BasicProperties.Builder builderBasicProperties = new 
AMQP.BasicProperties.Builder();
+        builderBasicProperties.headers(headersMap);
+
+        final Connection connection = new 
TestConnection(exchangeToRoutingKeymap, routingMap);
+
+        try (AMQPPublisher sender = new AMQPPublisher(connection, 
mock(ComponentLog.class))) {
+            sender.publish("hello".getBytes(), builderBasicProperties.build(), 
"key1", "myExchange");
+
+            ConsumeAMQP proc = new LocalConsumeAMQP(connection);
+            TestRunner runner = initTestRunner(proc);
+            runner.setProperty(ConsumeAMQP.REMOVE_CURLY_BRACES,"True");
+            runner.run();
+            final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+            assertNotNull(successFF);
+            successFF.assertAttributeEquals("amqp$routingKey", "key1");
+            successFF.assertAttributeEquals("amqp$exchange", "myExchange");
+            successFF.assertAttributeEquals("amqp$headers", "key1=(bar,bar)");
+
+        }
+    }
+
+    @Test
+    public void 
validateHeaderWithRemoveCurlyBracesAndValueSeparatorForHeaderParameterConsumeAndTransferToSuccess()
 throws Exception {
+        final Map<String, List<String>> routingMap = 
Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+        final Map<String, String> exchangeToRoutingKeymap = 
Collections.singletonMap("myExchange", "key1");
+        final Map<String, Object> headersMap = new HashMap<>();
+        headersMap.put("key1","(bar,bar)");
+        headersMap.put("key2","(bar,bar)");
+
+        AMQP.BasicProperties.Builder builderBasicProperties = new 
AMQP.BasicProperties.Builder();
+        builderBasicProperties.headers(headersMap);
+
+        final Connection connection = new 
TestConnection(exchangeToRoutingKeymap, routingMap);
+
+        try (AMQPPublisher sender = new AMQPPublisher(connection, 
mock(ComponentLog.class))) {
+            sender.publish("hello".getBytes(), builderBasicProperties.build(), 
"key1", "myExchange");
+
+            ConsumeAMQP proc = new LocalConsumeAMQP(connection);
+            TestRunner runner = initTestRunner(proc);
+            runner.setProperty(ConsumeAMQP.REMOVE_CURLY_BRACES,"True");
+            runner.setProperty(ConsumeAMQP.HEADER_SEPARATOR,"|");
+
+            runner.run();
+            final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+            assertNotNull(successFF);
+            successFF.assertAttributeEquals("amqp$routingKey", "key1");
+            successFF.assertAttributeEquals("amqp$exchange", "myExchange");
+            String headers = successFF.getAttribute("amqp$headers");
+            Map<String, String> properties = 
Splitter.on("|").withKeyValueSeparator("=").split(headers);
+            Assert.assertEquals(headersMap,properties);
+
+        }
+    }
+
+    @Test
+    public void validateHeaderWithoutParameterConsumeAndTransferToSuccess() 
throws Exception {
+        final Map<String, List<String>> routingMap = 
Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+        final Map<String, String> exchangeToRoutingKeymap = 
Collections.singletonMap("myExchange", "key1");
+        final Map<String, Object> headersMap = new HashMap<>();
+        headersMap.put("key1","bar");
+        headersMap.put("key2","bar2");
+
+        AMQP.BasicProperties.Builder builderBasicProperties = new 
AMQP.BasicProperties.Builder();
+        builderBasicProperties.headers(headersMap);
+
+        final Connection connection = new 
TestConnection(exchangeToRoutingKeymap, routingMap);
+
+        try (AMQPPublisher sender = new AMQPPublisher(connection, 
mock(ComponentLog.class))) {
+            sender.publish("hello".getBytes(), builderBasicProperties.build(), 
"key1", "myExchange");
+
+            ConsumeAMQP proc = new LocalConsumeAMQP(connection);
+            TestRunner runner = initTestRunner(proc);
+
+            runner.run();
+            final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+            assertNotNull(successFF);
+            successFF.assertAttributeEquals("amqp$routingKey", "key1");
+            successFF.assertAttributeEquals("amqp$exchange", "myExchange");
+            String headers = successFF.getAttribute("amqp$headers");
+            Map<String, String> properties = 
Splitter.on(",").withKeyValueSeparator("=").split(headers.substring(1,headers.length()-1));

Review comment:
       I can not. Because there are two elements in the Hashmap. I don't know 
in what order these elements will come.
   
   For HashMap "This class makes no guarantees as to the order of the map; in 
particular, it does not guarantee that the order will remain constant over 
time."




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to