nandorsoma commented on a change in pull request #5458: URL: https://github.com/apache/nifi/pull/5458#discussion_r757035734
########## File path: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java ########## @@ -240,20 +254,20 @@ private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile flowFile) { updateBuilderFromAttribute(flowFile, "userId", builder::userId); updateBuilderFromAttribute(flowFile, "appId", builder::appId); updateBuilderFromAttribute(flowFile, "clusterId", builder::clusterId); - updateBuilderFromAttribute(flowFile, "headers", headers -> builder.headers(validateAMQPHeaderProperty(headers))); + updateBuilderFromAttribute(flowFile, "headers", headers -> builder.headers(validateAMQPHeaderProperty(headers,headerSeparator))); return builder.build(); } /** * Will validate if provided amqpPropValue can be converted to a {@link Map}. - * Should be passed in the format: amqp$headers=key=value,key=value etc. - * + * Should be passed in the format: amqp$headers=key=value + * @param splitValue is used to split for property * @param amqpPropValue the value of the property * @return {@link Map} if valid otherwise null */ - private Map<String, Object> validateAMQPHeaderProperty(String amqpPropValue) { - String[] strEntries = amqpPropValue.split(","); + private Map<String, Object> validateAMQPHeaderProperty(String amqpPropValue,Character splitValue) { + String[] strEntries = amqpPropValue.split(Pattern.quote(String.valueOf(splitValue))); Review comment: Maybe I don't know something, but why do you use Pattern.quote here, instead of `amqpPropValue.split(String.valueOf(splitValue))`? ########## File path: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java ########## @@ -99,6 +101,28 @@ .required(true) .build(); + public static final PropertyDescriptor HEADER_SEPARATOR = new PropertyDescriptor.Builder() + .name("header.separator") + .displayName("Header Separator") + .description("The character that is used to separate key-value for header in String. The value must only one character. " + + "Otherwise it will be skipped and the default header separator(',') will be used." Review comment: Are we sure that it will be skipped? I think because of the validator an error message will be shown for the user and you won't be able to start the processor. Same applies for PublishAMQP processor. ########## File path: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java ########## @@ -161,6 +165,138 @@ public void validateSuccessfulConsumeAndTransferToSuccess() throws Exception { } } + @Test + public void validateHeaderWithValueSeparatorForHeaderParameterConsumeAndTransferToSuccess() throws Exception { + final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2")); + final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1"); + final Map<String, Object> headersMap = new HashMap<>(); + headersMap.put("foo1","bar,bar"); + headersMap.put("foo2","bar,bar"); + + AMQP.BasicProperties.Builder builderBasicProperties = new AMQP.BasicProperties.Builder(); + builderBasicProperties.headers(headersMap); + + final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); + + try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { + sender.publish("hello".getBytes(), builderBasicProperties.build(), "key1", "myExchange"); + + ConsumeAMQP proc = new LocalConsumeAMQP(connection); + TestRunner runner = initTestRunner(proc); + runner.setProperty(ConsumeAMQP.HEADER_SEPARATOR, "|"); + runner.run(); + final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0); + assertNotNull(successFF); + successFF.assertAttributeEquals("amqp$routingKey", "key1"); + successFF.assertAttributeEquals("amqp$exchange", "myExchange"); + String headers = successFF.getAttribute("amqp$headers"); + Map<String, String> properties = Splitter.on("|").withKeyValueSeparator("=").split(headers.substring(1,headers.length()-1)); + Assert.assertEquals(headersMap,properties); + + } + } + @Test + public void validateWithNotValidHeaderSeparatorParameter() { + final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2")); + final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1"); + final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); + ConsumeAMQP proc = new LocalConsumeAMQP(connection); + TestRunner runner = initTestRunner(proc); + runner.setProperty(ConsumeAMQP.HEADER_SEPARATOR, "|,"); + runner.assertNotValid(); + } + + @Test + public void validateHeaderWithRemoveCurlyBracesParameterConsumeAndTransferToSuccess() throws Exception { + final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2")); + final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1"); + final Map<String, Object> headersMap = new HashMap<>(); + headersMap.put("key1","(bar,bar)"); + AMQP.BasicProperties.Builder builderBasicProperties = new AMQP.BasicProperties.Builder(); + builderBasicProperties.headers(headersMap); + + final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); + + try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { + sender.publish("hello".getBytes(), builderBasicProperties.build(), "key1", "myExchange"); + + ConsumeAMQP proc = new LocalConsumeAMQP(connection); + TestRunner runner = initTestRunner(proc); + runner.setProperty(ConsumeAMQP.REMOVE_CURLY_BRACES,"True"); + runner.run(); + final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0); + assertNotNull(successFF); + successFF.assertAttributeEquals("amqp$routingKey", "key1"); + successFF.assertAttributeEquals("amqp$exchange", "myExchange"); + successFF.assertAttributeEquals("amqp$headers", "key1=(bar,bar)"); + + } + } + + @Test + public void validateHeaderWithRemoveCurlyBracesAndValueSeparatorForHeaderParameterConsumeAndTransferToSuccess() throws Exception { + final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2")); + final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1"); + final Map<String, Object> headersMap = new HashMap<>(); + headersMap.put("key1","(bar,bar)"); + headersMap.put("key2","(bar,bar)"); + + AMQP.BasicProperties.Builder builderBasicProperties = new AMQP.BasicProperties.Builder(); + builderBasicProperties.headers(headersMap); + + final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); + + try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { + sender.publish("hello".getBytes(), builderBasicProperties.build(), "key1", "myExchange"); + + ConsumeAMQP proc = new LocalConsumeAMQP(connection); + TestRunner runner = initTestRunner(proc); + runner.setProperty(ConsumeAMQP.REMOVE_CURLY_BRACES,"True"); + runner.setProperty(ConsumeAMQP.HEADER_SEPARATOR,"|"); + + runner.run(); + final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0); + assertNotNull(successFF); + successFF.assertAttributeEquals("amqp$routingKey", "key1"); + successFF.assertAttributeEquals("amqp$exchange", "myExchange"); + String headers = successFF.getAttribute("amqp$headers"); + Map<String, String> properties = Splitter.on("|").withKeyValueSeparator("=").split(headers); + Assert.assertEquals(headersMap,properties); + + } + } + + @Test + public void validateHeaderWithoutParameterConsumeAndTransferToSuccess() throws Exception { + final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2")); + final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1"); + final Map<String, Object> headersMap = new HashMap<>(); + headersMap.put("key1","bar"); + headersMap.put("key2","bar2"); + + AMQP.BasicProperties.Builder builderBasicProperties = new AMQP.BasicProperties.Builder(); + builderBasicProperties.headers(headersMap); + + final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); + + try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { + sender.publish("hello".getBytes(), builderBasicProperties.build(), "key1", "myExchange"); + + ConsumeAMQP proc = new LocalConsumeAMQP(connection); + TestRunner runner = initTestRunner(proc); + + runner.run(); + final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0); + assertNotNull(successFF); + successFF.assertAttributeEquals("amqp$routingKey", "key1"); + successFF.assertAttributeEquals("amqp$exchange", "myExchange"); + String headers = successFF.getAttribute("amqp$headers"); + Map<String, String> properties = Splitter.on(",").withKeyValueSeparator("=").split(headers.substring(1,headers.length()-1)); Review comment: I think you can hardcode the expectation here, it would be more visible what is the expected result. I understand that if get back the initial map, then the header is valid but in the case of that processor the output is a String therefore you can assert that. What do you think? ########## File path: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java ########## @@ -99,6 +101,28 @@ .required(true) .build(); + public static final PropertyDescriptor HEADER_SEPARATOR = new PropertyDescriptor.Builder() + .name("header.separator") + .displayName("Header Separator") + .description("The character that is used to separate key-value for header in String. The value must only one character. " + + "Otherwise it will be skipped and the default header separator(',') will be used." + + "The value of this parameter must be same to the value of parameter in PublishAMQP, when you use the PublishAMQP processor" ) Review comment: I think this is true in one case from several use cases and because of that maybe it could cause confusion therefore I'd remove that sentence. What do you think? Same applies for PublishAMQP processor. ########## File path: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/pom.xml ########## @@ -21,6 +21,8 @@ language governing permissions and limitations under the License. --> <properties> <amqp-client.version>5.8.0</amqp-client.version> + <common-text.version>1.8</common-text.version> Review comment: Is common-text is used somewhere? I don't find it. ########## File path: nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java ########## @@ -980,4 +980,35 @@ public ValidationResult validate(final String subject, final String value, final return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build(); } } + + public static final Validator SINGLE_CHAR_VALIDATOR = (subject, input, context) -> { + if (input == null) { + return new ValidationResult.Builder() + .input(input) + .subject(subject) + .valid(false) + .explanation("Input is null for this property") + .build(); + } + + if (input.isEmpty()) { Review comment: I think you can remove that if, because when the input is empty it will produce the same message that you generate in case of `input.length() != 1`. ########## File path: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java ########## @@ -192,6 +219,21 @@ private void addAttribute(final Map<String, String> attributes, final String att attributes.put(attributeName, value.toString()); } + private String buildHeaders(Map<String, Object> headers, boolean removeCurlyBraces,Character valueSeparatorForHeaders) { + if (headers == null) { + return null; + } + String headerString = Joiner.on(valueSeparatorForHeaders).withKeyValueSeparator("=").join(headers); + + if (!removeCurlyBraces) { Review comment: I like this approach, much more explicit. Tbh on the first review I didn't even understand how the curly braces come into play, then I realized that it was appended by Map.toString(). (I think that was a mistake in the initial implementation) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org