nandorsoma commented on a change in pull request #5458:
URL: https://github.com/apache/nifi/pull/5458#discussion_r757035734



##########
File path: 
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
##########
@@ -240,20 +254,20 @@ private BasicProperties 
extractAmqpPropertiesFromFlowFile(FlowFile flowFile) {
         updateBuilderFromAttribute(flowFile, "userId", builder::userId);
         updateBuilderFromAttribute(flowFile, "appId", builder::appId);
         updateBuilderFromAttribute(flowFile, "clusterId", builder::clusterId);
-        updateBuilderFromAttribute(flowFile, "headers", headers -> 
builder.headers(validateAMQPHeaderProperty(headers)));
+        updateBuilderFromAttribute(flowFile, "headers", headers -> 
builder.headers(validateAMQPHeaderProperty(headers,headerSeparator)));
 
         return builder.build();
     }
 
     /**
      * Will validate if provided amqpPropValue can be converted to a {@link 
Map}.
-     * Should be passed in the format: amqp$headers=key=value,key=value etc.
-     *
+     * Should be passed in the format: amqp$headers=key=value
+     * @param splitValue is used to split for property
      * @param amqpPropValue the value of the property
      * @return {@link Map} if valid otherwise null
      */
-    private Map<String, Object> validateAMQPHeaderProperty(String 
amqpPropValue) {
-        String[] strEntries = amqpPropValue.split(",");
+    private Map<String, Object> validateAMQPHeaderProperty(String 
amqpPropValue,Character splitValue) {
+        String[] strEntries = 
amqpPropValue.split(Pattern.quote(String.valueOf(splitValue)));

Review comment:
       Maybe I don't know something, but why do you use Pattern.quote here, 
instead of `amqpPropValue.split(String.valueOf(splitValue))`?

##########
File path: 
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
##########
@@ -99,6 +101,28 @@
         .required(true)
         .build();
 
+    public static final PropertyDescriptor HEADER_SEPARATOR = new 
PropertyDescriptor.Builder()
+            .name("header.separator")
+            .displayName("Header Separator")
+            .description("The character that is used to separate key-value for 
header in String. The value must only one character. "
+                    + "Otherwise it will be skipped and the default header 
separator(',') will be used."

Review comment:
       Are we sure that it will be skipped? I think because of the validator an 
error message will be shown for the user and you won't be able to start the 
processor. Same applies for PublishAMQP processor.

##########
File path: 
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
##########
@@ -161,6 +165,138 @@ public void 
validateSuccessfulConsumeAndTransferToSuccess() throws Exception {
         }
     }
 
+    @Test
+    public void 
validateHeaderWithValueSeparatorForHeaderParameterConsumeAndTransferToSuccess() 
throws Exception {
+        final Map<String, List<String>> routingMap = 
Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+        final Map<String, String> exchangeToRoutingKeymap = 
Collections.singletonMap("myExchange", "key1");
+        final Map<String, Object> headersMap = new HashMap<>();
+        headersMap.put("foo1","bar,bar");
+        headersMap.put("foo2","bar,bar");
+
+        AMQP.BasicProperties.Builder builderBasicProperties = new 
AMQP.BasicProperties.Builder();
+        builderBasicProperties.headers(headersMap);
+
+        final Connection connection = new 
TestConnection(exchangeToRoutingKeymap, routingMap);
+
+        try (AMQPPublisher sender = new AMQPPublisher(connection, 
mock(ComponentLog.class))) {
+            sender.publish("hello".getBytes(), builderBasicProperties.build(), 
"key1", "myExchange");
+
+            ConsumeAMQP proc = new LocalConsumeAMQP(connection);
+            TestRunner runner = initTestRunner(proc);
+            runner.setProperty(ConsumeAMQP.HEADER_SEPARATOR, "|");
+            runner.run();
+            final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+            assertNotNull(successFF);
+            successFF.assertAttributeEquals("amqp$routingKey", "key1");
+            successFF.assertAttributeEquals("amqp$exchange", "myExchange");
+            String headers = successFF.getAttribute("amqp$headers");
+            Map<String, String> properties = 
Splitter.on("|").withKeyValueSeparator("=").split(headers.substring(1,headers.length()-1));
+            Assert.assertEquals(headersMap,properties);
+
+        }
+    }
+    @Test
+    public void validateWithNotValidHeaderSeparatorParameter() {
+        final Map<String, List<String>> routingMap = 
Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+        final Map<String, String> exchangeToRoutingKeymap = 
Collections.singletonMap("myExchange", "key1");
+        final Connection connection = new 
TestConnection(exchangeToRoutingKeymap, routingMap);
+        ConsumeAMQP proc = new LocalConsumeAMQP(connection);
+        TestRunner runner = initTestRunner(proc);
+        runner.setProperty(ConsumeAMQP.HEADER_SEPARATOR, "|,");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void 
validateHeaderWithRemoveCurlyBracesParameterConsumeAndTransferToSuccess() 
throws Exception {
+        final Map<String, List<String>> routingMap = 
Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+        final Map<String, String> exchangeToRoutingKeymap = 
Collections.singletonMap("myExchange", "key1");
+        final Map<String, Object> headersMap = new HashMap<>();
+        headersMap.put("key1","(bar,bar)");
+        AMQP.BasicProperties.Builder builderBasicProperties = new 
AMQP.BasicProperties.Builder();
+        builderBasicProperties.headers(headersMap);
+
+        final Connection connection = new 
TestConnection(exchangeToRoutingKeymap, routingMap);
+
+        try (AMQPPublisher sender = new AMQPPublisher(connection, 
mock(ComponentLog.class))) {
+            sender.publish("hello".getBytes(), builderBasicProperties.build(), 
"key1", "myExchange");
+
+            ConsumeAMQP proc = new LocalConsumeAMQP(connection);
+            TestRunner runner = initTestRunner(proc);
+            runner.setProperty(ConsumeAMQP.REMOVE_CURLY_BRACES,"True");
+            runner.run();
+            final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+            assertNotNull(successFF);
+            successFF.assertAttributeEquals("amqp$routingKey", "key1");
+            successFF.assertAttributeEquals("amqp$exchange", "myExchange");
+            successFF.assertAttributeEquals("amqp$headers", "key1=(bar,bar)");
+
+        }
+    }
+
+    @Test
+    public void 
validateHeaderWithRemoveCurlyBracesAndValueSeparatorForHeaderParameterConsumeAndTransferToSuccess()
 throws Exception {
+        final Map<String, List<String>> routingMap = 
Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+        final Map<String, String> exchangeToRoutingKeymap = 
Collections.singletonMap("myExchange", "key1");
+        final Map<String, Object> headersMap = new HashMap<>();
+        headersMap.put("key1","(bar,bar)");
+        headersMap.put("key2","(bar,bar)");
+
+        AMQP.BasicProperties.Builder builderBasicProperties = new 
AMQP.BasicProperties.Builder();
+        builderBasicProperties.headers(headersMap);
+
+        final Connection connection = new 
TestConnection(exchangeToRoutingKeymap, routingMap);
+
+        try (AMQPPublisher sender = new AMQPPublisher(connection, 
mock(ComponentLog.class))) {
+            sender.publish("hello".getBytes(), builderBasicProperties.build(), 
"key1", "myExchange");
+
+            ConsumeAMQP proc = new LocalConsumeAMQP(connection);
+            TestRunner runner = initTestRunner(proc);
+            runner.setProperty(ConsumeAMQP.REMOVE_CURLY_BRACES,"True");
+            runner.setProperty(ConsumeAMQP.HEADER_SEPARATOR,"|");
+
+            runner.run();
+            final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+            assertNotNull(successFF);
+            successFF.assertAttributeEquals("amqp$routingKey", "key1");
+            successFF.assertAttributeEquals("amqp$exchange", "myExchange");
+            String headers = successFF.getAttribute("amqp$headers");
+            Map<String, String> properties = 
Splitter.on("|").withKeyValueSeparator("=").split(headers);
+            Assert.assertEquals(headersMap,properties);
+
+        }
+    }
+
+    @Test
+    public void validateHeaderWithoutParameterConsumeAndTransferToSuccess() 
throws Exception {
+        final Map<String, List<String>> routingMap = 
Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+        final Map<String, String> exchangeToRoutingKeymap = 
Collections.singletonMap("myExchange", "key1");
+        final Map<String, Object> headersMap = new HashMap<>();
+        headersMap.put("key1","bar");
+        headersMap.put("key2","bar2");
+
+        AMQP.BasicProperties.Builder builderBasicProperties = new 
AMQP.BasicProperties.Builder();
+        builderBasicProperties.headers(headersMap);
+
+        final Connection connection = new 
TestConnection(exchangeToRoutingKeymap, routingMap);
+
+        try (AMQPPublisher sender = new AMQPPublisher(connection, 
mock(ComponentLog.class))) {
+            sender.publish("hello".getBytes(), builderBasicProperties.build(), 
"key1", "myExchange");
+
+            ConsumeAMQP proc = new LocalConsumeAMQP(connection);
+            TestRunner runner = initTestRunner(proc);
+
+            runner.run();
+            final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+            assertNotNull(successFF);
+            successFF.assertAttributeEquals("amqp$routingKey", "key1");
+            successFF.assertAttributeEquals("amqp$exchange", "myExchange");
+            String headers = successFF.getAttribute("amqp$headers");
+            Map<String, String> properties = 
Splitter.on(",").withKeyValueSeparator("=").split(headers.substring(1,headers.length()-1));

Review comment:
       I think you can hardcode the expectation here, it would be more visible 
what is the expected result. I understand that if get back the initial map, 
then the header is valid but in the case of that processor the output is a 
String therefore you can assert that. What do you think?

##########
File path: 
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
##########
@@ -99,6 +101,28 @@
         .required(true)
         .build();
 
+    public static final PropertyDescriptor HEADER_SEPARATOR = new 
PropertyDescriptor.Builder()
+            .name("header.separator")
+            .displayName("Header Separator")
+            .description("The character that is used to separate key-value for 
header in String. The value must only one character. "
+                    + "Otherwise it will be skipped and the default header 
separator(',') will be used."
+                    + "The value of this parameter must be same to the value 
of parameter in PublishAMQP, when you use the PublishAMQP processor" )

Review comment:
       I think this is true in one case from several use cases and because of 
that maybe it could cause confusion therefore I'd remove that sentence. What do 
you think? Same applies for PublishAMQP processor.

##########
File path: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/pom.xml
##########
@@ -21,6 +21,8 @@ language governing permissions and limitations under the 
License. -->
 
     <properties>
         <amqp-client.version>5.8.0</amqp-client.version>
+        <common-text.version>1.8</common-text.version>

Review comment:
       Is common-text is used somewhere? I don't find it.

##########
File path: 
nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
##########
@@ -980,4 +980,35 @@ public ValidationResult validate(final String subject, 
final String value, final
             return new 
ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason
 == null).build();
         }
     }
+
+    public static final Validator SINGLE_CHAR_VALIDATOR = (subject, input, 
context) -> {
+        if (input == null) {
+            return new ValidationResult.Builder()
+                    .input(input)
+                    .subject(subject)
+                    .valid(false)
+                    .explanation("Input is null for this property")
+                    .build();
+        }
+
+        if (input.isEmpty()) {

Review comment:
       I think you can remove that if, because when the input is empty it will 
produce the same message that you generate in case of `input.length() != 1`.

##########
File path: 
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
##########
@@ -192,6 +219,21 @@ private void addAttribute(final Map<String, String> 
attributes, final String att
         attributes.put(attributeName, value.toString());
     }
 
+    private String buildHeaders(Map<String, Object> headers,  boolean 
removeCurlyBraces,Character valueSeparatorForHeaders) {
+        if (headers == null) {
+            return null;
+        }
+        String headerString = 
Joiner.on(valueSeparatorForHeaders).withKeyValueSeparator("=").join(headers);
+
+        if (!removeCurlyBraces) {

Review comment:
       I like this approach, much more explicit. Tbh on the first review I 
didn't even understand how the curly braces come into play, then I realized 
that it was appended by Map.toString(). (I think that was a mistake in the 
initial implementation)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to