sedadgn commented on a change in pull request #5458: URL: https://github.com/apache/nifi/pull/5458#discussion_r757453945
########## File path: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java ########## @@ -240,20 +254,20 @@ private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile flowFile) { updateBuilderFromAttribute(flowFile, "userId", builder::userId); updateBuilderFromAttribute(flowFile, "appId", builder::appId); updateBuilderFromAttribute(flowFile, "clusterId", builder::clusterId); - updateBuilderFromAttribute(flowFile, "headers", headers -> builder.headers(validateAMQPHeaderProperty(headers))); + updateBuilderFromAttribute(flowFile, "headers", headers -> builder.headers(validateAMQPHeaderProperty(headers,headerSeparator))); return builder.build(); } /** * Will validate if provided amqpPropValue can be converted to a {@link Map}. - * Should be passed in the format: amqp$headers=key=value,key=value etc. - * + * Should be passed in the format: amqp$headers=key=value + * @param splitValue is used to split for property * @param amqpPropValue the value of the property * @return {@link Map} if valid otherwise null */ - private Map<String, Object> validateAMQPHeaderProperty(String amqpPropValue) { - String[] strEntries = amqpPropValue.split(","); + private Map<String, Object> validateAMQPHeaderProperty(String amqpPropValue,Character splitValue) { + String[] strEntries = amqpPropValue.split(Pattern.quote(String.valueOf(splitValue))); Review comment: Pattern.quote create the escaped version of the regex representing. For Example '|'. Also you can look at this. https://stackoverflow.com/questions/10796160/splitting-a-java-string-by-the-pipe-symbol-using-split ########## File path: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java ########## @@ -161,6 +165,138 @@ public void validateSuccessfulConsumeAndTransferToSuccess() throws Exception { } } + @Test + public void validateHeaderWithValueSeparatorForHeaderParameterConsumeAndTransferToSuccess() throws Exception { + final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2")); + final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1"); + final Map<String, Object> headersMap = new HashMap<>(); + headersMap.put("foo1","bar,bar"); + headersMap.put("foo2","bar,bar"); + + AMQP.BasicProperties.Builder builderBasicProperties = new AMQP.BasicProperties.Builder(); + builderBasicProperties.headers(headersMap); + + final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); + + try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { + sender.publish("hello".getBytes(), builderBasicProperties.build(), "key1", "myExchange"); + + ConsumeAMQP proc = new LocalConsumeAMQP(connection); + TestRunner runner = initTestRunner(proc); + runner.setProperty(ConsumeAMQP.HEADER_SEPARATOR, "|"); + runner.run(); + final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0); + assertNotNull(successFF); + successFF.assertAttributeEquals("amqp$routingKey", "key1"); + successFF.assertAttributeEquals("amqp$exchange", "myExchange"); + String headers = successFF.getAttribute("amqp$headers"); + Map<String, String> properties = Splitter.on("|").withKeyValueSeparator("=").split(headers.substring(1,headers.length()-1)); + Assert.assertEquals(headersMap,properties); + + } + } + @Test + public void validateWithNotValidHeaderSeparatorParameter() { + final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2")); + final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1"); + final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); + ConsumeAMQP proc = new LocalConsumeAMQP(connection); + TestRunner runner = initTestRunner(proc); + runner.setProperty(ConsumeAMQP.HEADER_SEPARATOR, "|,"); + runner.assertNotValid(); + } + + @Test + public void validateHeaderWithRemoveCurlyBracesParameterConsumeAndTransferToSuccess() throws Exception { + final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2")); + final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1"); + final Map<String, Object> headersMap = new HashMap<>(); + headersMap.put("key1","(bar,bar)"); + AMQP.BasicProperties.Builder builderBasicProperties = new AMQP.BasicProperties.Builder(); + builderBasicProperties.headers(headersMap); + + final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); + + try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { + sender.publish("hello".getBytes(), builderBasicProperties.build(), "key1", "myExchange"); + + ConsumeAMQP proc = new LocalConsumeAMQP(connection); + TestRunner runner = initTestRunner(proc); + runner.setProperty(ConsumeAMQP.REMOVE_CURLY_BRACES,"True"); + runner.run(); + final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0); + assertNotNull(successFF); + successFF.assertAttributeEquals("amqp$routingKey", "key1"); + successFF.assertAttributeEquals("amqp$exchange", "myExchange"); + successFF.assertAttributeEquals("amqp$headers", "key1=(bar,bar)"); + + } + } + + @Test + public void validateHeaderWithRemoveCurlyBracesAndValueSeparatorForHeaderParameterConsumeAndTransferToSuccess() throws Exception { + final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2")); + final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1"); + final Map<String, Object> headersMap = new HashMap<>(); + headersMap.put("key1","(bar,bar)"); + headersMap.put("key2","(bar,bar)"); + + AMQP.BasicProperties.Builder builderBasicProperties = new AMQP.BasicProperties.Builder(); + builderBasicProperties.headers(headersMap); + + final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); + + try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { + sender.publish("hello".getBytes(), builderBasicProperties.build(), "key1", "myExchange"); + + ConsumeAMQP proc = new LocalConsumeAMQP(connection); + TestRunner runner = initTestRunner(proc); + runner.setProperty(ConsumeAMQP.REMOVE_CURLY_BRACES,"True"); + runner.setProperty(ConsumeAMQP.HEADER_SEPARATOR,"|"); + + runner.run(); + final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0); + assertNotNull(successFF); + successFF.assertAttributeEquals("amqp$routingKey", "key1"); + successFF.assertAttributeEquals("amqp$exchange", "myExchange"); + String headers = successFF.getAttribute("amqp$headers"); + Map<String, String> properties = Splitter.on("|").withKeyValueSeparator("=").split(headers); + Assert.assertEquals(headersMap,properties); + + } + } + + @Test + public void validateHeaderWithoutParameterConsumeAndTransferToSuccess() throws Exception { + final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2")); + final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1"); + final Map<String, Object> headersMap = new HashMap<>(); + headersMap.put("key1","bar"); + headersMap.put("key2","bar2"); + + AMQP.BasicProperties.Builder builderBasicProperties = new AMQP.BasicProperties.Builder(); + builderBasicProperties.headers(headersMap); + + final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); + + try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { + sender.publish("hello".getBytes(), builderBasicProperties.build(), "key1", "myExchange"); + + ConsumeAMQP proc = new LocalConsumeAMQP(connection); + TestRunner runner = initTestRunner(proc); + + runner.run(); + final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0); + assertNotNull(successFF); + successFF.assertAttributeEquals("amqp$routingKey", "key1"); + successFF.assertAttributeEquals("amqp$exchange", "myExchange"); + String headers = successFF.getAttribute("amqp$headers"); + Map<String, String> properties = Splitter.on(",").withKeyValueSeparator("=").split(headers.substring(1,headers.length()-1)); Review comment: I can not. Because there are two elements in the Hashmap. I don't know in what order these elements will come. For HashMap "This class makes no guarantees as to the order of the map; in particular, it does not guarantee that the order will remain constant over time." -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org