exceptionfactory commented on code in PR #8105: URL: https://github.com/apache/nifi/pull/8105#discussion_r1669267566
########## nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java: ########## @@ -85,9 +85,9 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> { .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(Validator.VALID) .build(); - public static final PropertyDescriptor ROUTING_KEY = new PropertyDescriptor.Builder() .name("Routing Key") + .displayName("Routing Key") Review Comment: ```suggestion ``` ########## nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java: ########## @@ -206,46 +247,94 @@ private byte[] extractMessage(FlowFile flowFile, ProcessSession session) { } - private void updateBuilderFromAttribute(final FlowFile flowFile, final String attribute, final Consumer<String> updater) { - final String attributeValue = flowFile.getAttribute(ATTRIBUTES_PREFIX + attribute); + /** + * Reads an attribute from flowFile and pass it to the consumer function + * + * @param flowFile FlowFile for reading the attribute + * @param attributeKey Name of the attribute + * @param updater Consumer function which will use the attribute value + */ + private void readAmqpAttribute(final FlowFile flowFile, final String attributeKey, final Consumer<String> updater) { + final String attributeValue = flowFile.getAttribute(attributeKey); if (attributeValue == null) { return; } try { updater.accept(attributeValue); } catch (final Exception e) { - getLogger().warn("Failed to update AMQP Message Property {}", attribute, e); + getLogger().warn("Failed to update AMQP Message Property [{}]", attributeKey, e); } } /** * Extracts AMQP properties from the {@link FlowFile} attributes. Attributes * extracted from {@link FlowFile} are considered candidates for AMQP - * properties if their names are prefixed with - * {@link PublishAMQP#ATTRIBUTES_PREFIX} (e.g., amqp$contentType=text/xml). + * properties if their names are prefixed with "amq$" (e.g., amqp$contentType=text/xml). */ - private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile flowFile, Character headerSeparator) { + private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile flowFile, String selectedHeaderSource, String headerSourcePrecedence, Character separator, Pattern pattern) { final AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); - updateBuilderFromAttribute(flowFile, "contentType", builder::contentType); - updateBuilderFromAttribute(flowFile, "contentEncoding", builder::contentEncoding); - updateBuilderFromAttribute(flowFile, "deliveryMode", mode -> builder.deliveryMode(Integer.parseInt(mode))); - updateBuilderFromAttribute(flowFile, "priority", pri -> builder.priority(Integer.parseInt(pri))); - updateBuilderFromAttribute(flowFile, "correlationId", builder::correlationId); - updateBuilderFromAttribute(flowFile, "replyTo", builder::replyTo); - updateBuilderFromAttribute(flowFile, "expiration", builder::expiration); - updateBuilderFromAttribute(flowFile, "messageId", builder::messageId); - updateBuilderFromAttribute(flowFile, "timestamp", ts -> builder.timestamp(new Date(Long.parseLong(ts)))); - updateBuilderFromAttribute(flowFile, "type", builder::type); - updateBuilderFromAttribute(flowFile, "userId", builder::userId); - updateBuilderFromAttribute(flowFile, "appId", builder::appId); - updateBuilderFromAttribute(flowFile, "clusterId", builder::clusterId); - updateBuilderFromAttribute(flowFile, "headers", headers -> builder.headers(validateAMQPHeaderProperty(headers, headerSeparator))); + readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_CONTENT_TYPE_ATTRIBUTE, builder::contentType); + readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_CONTENT_ENCODING_ATTRIBUTE, builder::contentEncoding); + readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_DELIVERY_MODE_ATTRIBUTE, mode -> builder.deliveryMode(Integer.parseInt(mode))); + readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_PRIORITY_ATTRIBUTE, pri -> builder.priority(Integer.parseInt(pri))); + readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_CORRELATION_ID_ATTRIBUTE, builder::correlationId); + readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_REPLY_TO_ATTRIBUTE, builder::replyTo); + readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_EXPIRATION_ATTRIBUTE, builder::expiration); + readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_MESSAGE_ID_ATTRIBUTE, builder::messageId); + readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_TIMESTAMP_ATTRIBUTE, ts -> builder.timestamp(new Date(Long.parseLong(ts)))); + readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_TYPE_ATTRIBUTE, builder::type); + readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_USER_ID_ATTRIBUTE, builder::userId); + readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_APPID_ATTRIBUTE, builder::appId); + readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_CLUSTER_ID_ATTRIBUTE, builder::clusterId); + + Map<String, Object> headers = prepareAMQPHeaders(flowFile, selectedHeaderSource, headerSourcePrecedence, separator, pattern); + builder.headers(headers); return builder.build(); } + /** + * Extract AMQP headers from incoming {@link FlowFile} based on selected headers source value. + * + * @param flowFile used to extract headers + * @return {@link Map} + */ + private Map<String, Object> prepareAMQPHeaders(FlowFile flowFile, String selectedHeaderSource, String headerSourcePrecedence, Character headerSeparator, Pattern pattern) { Review Comment: This should be changed to take the `HeaderSource` enum instead of the string value and all arguments should be declared as `final`. ########## nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java: ########## @@ -96,13 +96,44 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> { .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - + public static final PropertyDescriptor HEADERS_SOURCE = new PropertyDescriptor.Builder() + .name("Headers Source") + .displayName("Headers Source") Review Comment: ```suggestion ``` ########## nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java: ########## @@ -158,10 +188,21 @@ protected void processResource(final Connection connection, final AMQPPublisher throw new IllegalArgumentException("Failed to determine 'routing key' with provided value '" + context.getProperty(ROUTING_KEY) + "' after evaluating it as expression against incoming FlowFile."); } + String selectedHeaderSource = context.getProperty(HEADERS_SOURCE).getValue(); Review Comment: This should use `asDescribedValue()` and return `HeaderSource` instead of the string value. ########## nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java: ########## @@ -96,13 +96,44 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> { .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - + public static final PropertyDescriptor HEADERS_SOURCE = new PropertyDescriptor.Builder() + .name("Headers Source") + .displayName("Headers Source") + .description( + "The source of the headers which will be put in the published message. They can come either from the processor property as a string or they can be " + + "picked from flow file attributes based on Regex expression.") + .required(true) + .allowableValues(InputHeaderSource.getAllowedValues()) + .defaultValue(InputHeaderSource.STRING) + .build(); + public static final PropertyDescriptor HEADERS_SOURCE_PRECEDENCE = new PropertyDescriptor.Builder() Review Comment: ```suggestion public static final PropertyDescriptor HEADERS_SOURCE_PRECEDENCE = new PropertyDescriptor.Builder() ``` ########## nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java: ########## @@ -96,13 +96,44 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> { .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - + public static final PropertyDescriptor HEADERS_SOURCE = new PropertyDescriptor.Builder() + .name("Headers Source") + .displayName("Headers Source") + .description( + "The source of the headers which will be put in the published message. They can come either from the processor property as a string or they can be " + + "picked from flow file attributes based on Regex expression.") Review Comment: ```suggestion .description("The source of the headers which will be applied to the published message.") ``` ########## nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java: ########## @@ -125,26 +156,25 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> { List<PropertyDescriptor> properties = new ArrayList<>(); properties.add(EXCHANGE); properties.add(ROUTING_KEY); + properties.add(HEADERS_SOURCE); + properties.add(HEADERS_SOURCE_PRECEDENCE); + properties.add(HEADERS_ATTRIBUTES_REGEX); properties.add(HEADER_SEPARATOR); properties.addAll(getCommonPropertyDescriptors()); propertyDescriptors = Collections.unmodifiableList(properties); - - Set<Relationship> rels = new HashSet<>(); - rels.add(REL_SUCCESS); - rels.add(REL_FAILURE); - relationships = Collections.unmodifiableSet(rels); + relationships = Set.of(REL_SUCCESS, REL_FAILURE); } - /** * Will construct AMQP message by extracting its body from the incoming {@link FlowFile}. AMQP Properties will be extracted from the * {@link FlowFile} and converted to {@link BasicProperties} to be sent along with the message. Upon success the incoming {@link FlowFile} is * transferred to 'success' {@link Relationship} and upon failure FlowFile is penalized and transferred to the 'failure' {@link Relationship} * <br> - * - * NOTE: Attributes extracted from {@link FlowFile} are considered - * candidates for AMQP properties if their names are prefixed with - * {@link PublishAMQP#ATTRIBUTES_PREFIX} (e.g., amqp$contentType=text/xml) + * <p> + * NOTE: Attributes extracted from {@link FlowFile} are considered candidates for AMQP properties if their names are prefixed with + * "amq$" (e.g., amqp$contentType=text/xml). For "amqp$headers" it depends on the value of Review Comment: ```suggestion * "amqp$" (e.g., amqp$contentType=text/xml). For "amqp$headers" it depends on the value of ``` ########## nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java: ########## @@ -96,13 +96,44 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> { .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - + public static final PropertyDescriptor HEADERS_SOURCE = new PropertyDescriptor.Builder() + .name("Headers Source") + .displayName("Headers Source") + .description( + "The source of the headers which will be put in the published message. They can come either from the processor property as a string or they can be " + + "picked from flow file attributes based on Regex expression.") + .required(true) + .allowableValues(InputHeaderSource.getAllowedValues()) Review Comment: ```suggestion .allowableValues(InputHeaderSource.class) ``` ########## nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java: ########## @@ -268,4 +357,43 @@ private Map<String, Object> validateAMQPHeaderProperty(String amqpPropValue, Cha } return headers; } + public enum InputHeaderSource implements DescribedValue { + + ATTRIBUTES("headersFromAttributes", "Attributes Matching Regex", + "Select attributes based on regex pattern to put in rabbitmq headers. Key of the attribute will be used as header key"), + STRING("headersFromString", "Attribute 'amp$headers' Value", + "Prepare headers from 'amp$headers' attribute string"), + BOTH("headersFromBoth", "Regex Match And 'amp$headers' Value", Review Comment: The value of `BOTH` is confusing and would not translate well if some new value were introduced. It seems like it should be removed, along with the Precedence property. ########## nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java: ########## @@ -268,4 +357,43 @@ private Map<String, Object> validateAMQPHeaderProperty(String amqpPropValue, Cha } return headers; } + public enum InputHeaderSource implements DescribedValue { + + ATTRIBUTES("headersFromAttributes", "Attributes Matching Regex", + "Select attributes based on regex pattern to put in rabbitmq headers. Key of the attribute will be used as header key"), + STRING("headersFromString", "Attribute 'amp$headers' Value", + "Prepare headers from 'amp$headers' attribute string"), + BOTH("headersFromBoth", "Regex Match And 'amp$headers' Value", + "Take headers from both sources: 'amp$headers' attribute and attributes matching Regex. In case of key duplication precedence property will define which value to take."); + + private final String value; + private final String displayName; + private final String description; + + InputHeaderSource(String value, String displayName, String description) { + + this.value = value; + this.displayName = displayName; + this.description = description; + } + + public static EnumSet<InputHeaderSource> getAllowedValues() { + return EnumSet.of(STRING, ATTRIBUTES, BOTH); + } + + @Override + public String getValue() { + return value; Review Comment: ```suggestion return name(); ``` ########## nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java: ########## @@ -268,4 +357,43 @@ private Map<String, Object> validateAMQPHeaderProperty(String amqpPropValue, Cha } return headers; } + public enum InputHeaderSource implements DescribedValue { + + ATTRIBUTES("headersFromAttributes", "Attributes Matching Regex", + "Select attributes based on regex pattern to put in rabbitmq headers. Key of the attribute will be used as header key"), + STRING("headersFromString", "Attribute 'amp$headers' Value", + "Prepare headers from 'amp$headers' attribute string"), + BOTH("headersFromBoth", "Regex Match And 'amp$headers' Value", + "Take headers from both sources: 'amp$headers' attribute and attributes matching Regex. In case of key duplication precedence property will define which value to take."); + + private final String value; + private final String displayName; + private final String description; + + InputHeaderSource(String value, String displayName, String description) { Review Comment: The `value` is unnecessary and the `name()` from the enum can be used instead. ########## nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java: ########## @@ -96,13 +96,44 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> { .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - + public static final PropertyDescriptor HEADERS_SOURCE = new PropertyDescriptor.Builder() + .name("Headers Source") + .displayName("Headers Source") + .description( + "The source of the headers which will be put in the published message. They can come either from the processor property as a string or they can be " + + "picked from flow file attributes based on Regex expression.") + .required(true) + .allowableValues(InputHeaderSource.getAllowedValues()) + .defaultValue(InputHeaderSource.STRING) + .build(); + public static final PropertyDescriptor HEADERS_SOURCE_PRECEDENCE = new PropertyDescriptor.Builder() + .name("Headers Source Precedence") Review Comment: It seems like it would be better to remove this property and just prefer one option or the other. ########## nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java: ########## @@ -206,46 +247,94 @@ private byte[] extractMessage(FlowFile flowFile, ProcessSession session) { } - private void updateBuilderFromAttribute(final FlowFile flowFile, final String attribute, final Consumer<String> updater) { - final String attributeValue = flowFile.getAttribute(ATTRIBUTES_PREFIX + attribute); + /** + * Reads an attribute from flowFile and pass it to the consumer function + * + * @param flowFile FlowFile for reading the attribute + * @param attributeKey Name of the attribute + * @param updater Consumer function which will use the attribute value + */ + private void readAmqpAttribute(final FlowFile flowFile, final String attributeKey, final Consumer<String> updater) { + final String attributeValue = flowFile.getAttribute(attributeKey); if (attributeValue == null) { return; } try { updater.accept(attributeValue); } catch (final Exception e) { - getLogger().warn("Failed to update AMQP Message Property {}", attribute, e); + getLogger().warn("Failed to update AMQP Message Property [{}]", attributeKey, e); } } /** * Extracts AMQP properties from the {@link FlowFile} attributes. Attributes * extracted from {@link FlowFile} are considered candidates for AMQP - * properties if their names are prefixed with - * {@link PublishAMQP#ATTRIBUTES_PREFIX} (e.g., amqp$contentType=text/xml). + * properties if their names are prefixed with "amq$" (e.g., amqp$contentType=text/xml). */ - private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile flowFile, Character headerSeparator) { + private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile flowFile, String selectedHeaderSource, String headerSourcePrecedence, Character separator, Pattern pattern) { final AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); - updateBuilderFromAttribute(flowFile, "contentType", builder::contentType); - updateBuilderFromAttribute(flowFile, "contentEncoding", builder::contentEncoding); - updateBuilderFromAttribute(flowFile, "deliveryMode", mode -> builder.deliveryMode(Integer.parseInt(mode))); - updateBuilderFromAttribute(flowFile, "priority", pri -> builder.priority(Integer.parseInt(pri))); - updateBuilderFromAttribute(flowFile, "correlationId", builder::correlationId); - updateBuilderFromAttribute(flowFile, "replyTo", builder::replyTo); - updateBuilderFromAttribute(flowFile, "expiration", builder::expiration); - updateBuilderFromAttribute(flowFile, "messageId", builder::messageId); - updateBuilderFromAttribute(flowFile, "timestamp", ts -> builder.timestamp(new Date(Long.parseLong(ts)))); - updateBuilderFromAttribute(flowFile, "type", builder::type); - updateBuilderFromAttribute(flowFile, "userId", builder::userId); - updateBuilderFromAttribute(flowFile, "appId", builder::appId); - updateBuilderFromAttribute(flowFile, "clusterId", builder::clusterId); - updateBuilderFromAttribute(flowFile, "headers", headers -> builder.headers(validateAMQPHeaderProperty(headers, headerSeparator))); + readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_CONTENT_TYPE_ATTRIBUTE, builder::contentType); + readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_CONTENT_ENCODING_ATTRIBUTE, builder::contentEncoding); + readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_DELIVERY_MODE_ATTRIBUTE, mode -> builder.deliveryMode(Integer.parseInt(mode))); + readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_PRIORITY_ATTRIBUTE, pri -> builder.priority(Integer.parseInt(pri))); + readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_CORRELATION_ID_ATTRIBUTE, builder::correlationId); + readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_REPLY_TO_ATTRIBUTE, builder::replyTo); + readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_EXPIRATION_ATTRIBUTE, builder::expiration); + readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_MESSAGE_ID_ATTRIBUTE, builder::messageId); + readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_TIMESTAMP_ATTRIBUTE, ts -> builder.timestamp(new Date(Long.parseLong(ts)))); + readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_TYPE_ATTRIBUTE, builder::type); + readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_USER_ID_ATTRIBUTE, builder::userId); + readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_APPID_ATTRIBUTE, builder::appId); + readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_CLUSTER_ID_ATTRIBUTE, builder::clusterId); + + Map<String, Object> headers = prepareAMQPHeaders(flowFile, selectedHeaderSource, headerSourcePrecedence, separator, pattern); + builder.headers(headers); return builder.build(); } + /** + * Extract AMQP headers from incoming {@link FlowFile} based on selected headers source value. + * + * @param flowFile used to extract headers + * @return {@link Map} + */ + private Map<String, Object> prepareAMQPHeaders(FlowFile flowFile, String selectedHeaderSource, String headerSourcePrecedence, Character headerSeparator, Pattern pattern) { + final Map<String, Object> headers = new HashMap<>(); + if (InputHeaderSource.ATTRIBUTES.getValue().equals(selectedHeaderSource)) { + headers.putAll(attributesToHeaders(flowFile.getAttributes(), pattern)); + } else if (InputHeaderSource.STRING.getValue().equals(selectedHeaderSource)) { + readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE, value -> headers.putAll(validateAMQPHeaderProperty(value, headerSeparator))); + } else { + // When precedence matches, put values in the last so it can override keys from other source + if (InputHeaderSource.ATTRIBUTES.getValue().equals(headerSourcePrecedence)) { + readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE, value -> headers.putAll(validateAMQPHeaderProperty(value, headerSeparator))); + headers.putAll(attributesToHeaders(flowFile.getAttributes(), pattern)); + } else { + headers.putAll(attributesToHeaders(flowFile.getAttributes(), pattern)); + readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE, value -> headers.putAll(validateAMQPHeaderProperty(value, headerSeparator))); + } + } + return headers; + } + + /** + * Matches the pattern to keys of input attributes and output the amqp headers map + * @param attributes flowFile attributes to scan for match + * @return Map with entries matching the pattern + */ + private Map<String, String> attributesToHeaders(Map<String, String> attributes, Pattern pattern) { Review Comment: ```suggestion private Map<String, String> getMatchedAttributes(final Map<String, String> attributes, final Pattern pattern) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org