exceptionfactory commented on code in PR #8105:
URL: https://github.com/apache/nifi/pull/8105#discussion_r1669267566


##########
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##########
@@ -85,9 +85,9 @@ public class PublishAMQP extends 
AbstractAMQPProcessor<AMQPPublisher> {
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .addValidator(Validator.VALID)
             .build();
-
     public static final PropertyDescriptor ROUTING_KEY = new 
PropertyDescriptor.Builder()
             .name("Routing Key")
+            .displayName("Routing Key")

Review Comment:
   ```suggestion
   ```



##########
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##########
@@ -206,46 +247,94 @@ private byte[] extractMessage(FlowFile flowFile, 
ProcessSession session) {
     }
 
 
-    private void updateBuilderFromAttribute(final FlowFile flowFile, final 
String attribute, final Consumer<String> updater) {
-        final String attributeValue = flowFile.getAttribute(ATTRIBUTES_PREFIX 
+ attribute);
+    /**
+     * Reads an attribute from flowFile and pass it to the consumer function
+     *
+     * @param flowFile        FlowFile for reading the attribute
+     * @param attributeKey    Name of the attribute
+     * @param updater         Consumer function which will use the attribute 
value
+     */
+    private void readAmqpAttribute(final FlowFile flowFile, final String 
attributeKey, final Consumer<String> updater) {
+        final String attributeValue = flowFile.getAttribute(attributeKey);
         if (attributeValue == null) {
             return;
         }
 
         try {
             updater.accept(attributeValue);
         } catch (final Exception e) {
-            getLogger().warn("Failed to update AMQP Message Property {}", 
attribute, e);
+            getLogger().warn("Failed to update AMQP Message Property [{}]", 
attributeKey, e);
         }
     }
 
     /**
      * Extracts AMQP properties from the {@link FlowFile} attributes. 
Attributes
      * extracted from {@link FlowFile} are considered candidates for AMQP
-     * properties if their names are prefixed with
-     * {@link PublishAMQP#ATTRIBUTES_PREFIX} (e.g., amqp$contentType=text/xml).
+     * properties if their names are prefixed with "amq$" (e.g., 
amqp$contentType=text/xml).
      */
-    private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile 
flowFile, Character headerSeparator) {
+    private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile 
flowFile, String selectedHeaderSource, String headerSourcePrecedence, Character 
separator, Pattern pattern) {
         final AMQP.BasicProperties.Builder builder = new 
AMQP.BasicProperties.Builder();
 
-        updateBuilderFromAttribute(flowFile, "contentType", 
builder::contentType);
-        updateBuilderFromAttribute(flowFile, "contentEncoding", 
builder::contentEncoding);
-        updateBuilderFromAttribute(flowFile, "deliveryMode", mode -> 
builder.deliveryMode(Integer.parseInt(mode)));
-        updateBuilderFromAttribute(flowFile, "priority", pri -> 
builder.priority(Integer.parseInt(pri)));
-        updateBuilderFromAttribute(flowFile, "correlationId", 
builder::correlationId);
-        updateBuilderFromAttribute(flowFile, "replyTo", builder::replyTo);
-        updateBuilderFromAttribute(flowFile, "expiration", 
builder::expiration);
-        updateBuilderFromAttribute(flowFile, "messageId", builder::messageId);
-        updateBuilderFromAttribute(flowFile, "timestamp", ts -> 
builder.timestamp(new Date(Long.parseLong(ts))));
-        updateBuilderFromAttribute(flowFile, "type", builder::type);
-        updateBuilderFromAttribute(flowFile, "userId", builder::userId);
-        updateBuilderFromAttribute(flowFile, "appId", builder::appId);
-        updateBuilderFromAttribute(flowFile, "clusterId", builder::clusterId);
-        updateBuilderFromAttribute(flowFile, "headers", headers -> 
builder.headers(validateAMQPHeaderProperty(headers, headerSeparator)));
+        readAmqpAttribute(flowFile, 
AbstractAMQPProcessor.AMQP_CONTENT_TYPE_ATTRIBUTE, builder::contentType);
+        readAmqpAttribute(flowFile, 
AbstractAMQPProcessor.AMQP_CONTENT_ENCODING_ATTRIBUTE, 
builder::contentEncoding);
+        readAmqpAttribute(flowFile, 
AbstractAMQPProcessor.AMQP_DELIVERY_MODE_ATTRIBUTE, mode -> 
builder.deliveryMode(Integer.parseInt(mode)));
+        readAmqpAttribute(flowFile, 
AbstractAMQPProcessor.AMQP_PRIORITY_ATTRIBUTE, pri -> 
builder.priority(Integer.parseInt(pri)));
+        readAmqpAttribute(flowFile, 
AbstractAMQPProcessor.AMQP_CORRELATION_ID_ATTRIBUTE, builder::correlationId);
+        readAmqpAttribute(flowFile, 
AbstractAMQPProcessor.AMQP_REPLY_TO_ATTRIBUTE, builder::replyTo);
+        readAmqpAttribute(flowFile, 
AbstractAMQPProcessor.AMQP_EXPIRATION_ATTRIBUTE, builder::expiration);
+        readAmqpAttribute(flowFile, 
AbstractAMQPProcessor.AMQP_MESSAGE_ID_ATTRIBUTE, builder::messageId);
+        readAmqpAttribute(flowFile, 
AbstractAMQPProcessor.AMQP_TIMESTAMP_ATTRIBUTE, ts -> builder.timestamp(new 
Date(Long.parseLong(ts))));
+        readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_TYPE_ATTRIBUTE, 
builder::type);
+        readAmqpAttribute(flowFile, 
AbstractAMQPProcessor.AMQP_USER_ID_ATTRIBUTE, builder::userId);
+        readAmqpAttribute(flowFile, 
AbstractAMQPProcessor.AMQP_APPID_ATTRIBUTE, builder::appId);
+        readAmqpAttribute(flowFile, 
AbstractAMQPProcessor.AMQP_CLUSTER_ID_ATTRIBUTE, builder::clusterId);
+
+        Map<String, Object> headers = prepareAMQPHeaders(flowFile, 
selectedHeaderSource, headerSourcePrecedence, separator, pattern);
+        builder.headers(headers);
 
         return builder.build();
     }
 
+    /**
+     * Extract AMQP headers from incoming {@link FlowFile} based on selected 
headers source value.
+     *
+     * @param flowFile used to extract headers
+     * @return {@link Map}
+     */
+    private Map<String, Object> prepareAMQPHeaders(FlowFile flowFile, String 
selectedHeaderSource, String headerSourcePrecedence, Character headerSeparator, 
Pattern pattern) {

Review Comment:
   This should be changed to take the `HeaderSource` enum instead of the string 
value and all arguments should be declared as `final`.



##########
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##########
@@ -96,13 +96,44 @@ public class PublishAMQP extends 
AbstractAMQPProcessor<AMQPPublisher> {
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
-
+    public static final PropertyDescriptor HEADERS_SOURCE = new 
PropertyDescriptor.Builder()
+            .name("Headers Source")
+            .displayName("Headers Source")

Review Comment:
   ```suggestion
   ```



##########
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##########
@@ -158,10 +188,21 @@ protected void processResource(final Connection 
connection, final AMQPPublisher
             throw new IllegalArgumentException("Failed to determine 'routing 
key' with provided value '"
                 + context.getProperty(ROUTING_KEY) + "' after evaluating it as 
expression against incoming FlowFile.");
         }
+        String selectedHeaderSource = 
context.getProperty(HEADERS_SOURCE).getValue();

Review Comment:
   This should use `asDescribedValue()` and return `HeaderSource` instead of 
the string value.



##########
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##########
@@ -96,13 +96,44 @@ public class PublishAMQP extends 
AbstractAMQPProcessor<AMQPPublisher> {
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
-
+    public static final PropertyDescriptor HEADERS_SOURCE = new 
PropertyDescriptor.Builder()
+            .name("Headers Source")
+            .displayName("Headers Source")
+            .description(
+                    "The source of the headers which will be put in the 
published message. They can come either from the processor property as a string 
or they can be " +
+                            "picked from flow file attributes based on Regex 
expression.")
+            .required(true)
+            .allowableValues(InputHeaderSource.getAllowedValues())
+            .defaultValue(InputHeaderSource.STRING)
+            .build();
+    public static final PropertyDescriptor HEADERS_SOURCE_PRECEDENCE = new 
PropertyDescriptor.Builder()

Review Comment:
   ```suggestion
   
       public static final PropertyDescriptor HEADERS_SOURCE_PRECEDENCE = new 
PropertyDescriptor.Builder()
   ```



##########
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##########
@@ -96,13 +96,44 @@ public class PublishAMQP extends 
AbstractAMQPProcessor<AMQPPublisher> {
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
-
+    public static final PropertyDescriptor HEADERS_SOURCE = new 
PropertyDescriptor.Builder()
+            .name("Headers Source")
+            .displayName("Headers Source")
+            .description(
+                    "The source of the headers which will be put in the 
published message. They can come either from the processor property as a string 
or they can be " +
+                            "picked from flow file attributes based on Regex 
expression.")

Review Comment:
   ```suggestion
               .description("The source of the headers which will be applied to 
the published message.")
   ```



##########
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##########
@@ -125,26 +156,25 @@ public class PublishAMQP extends 
AbstractAMQPProcessor<AMQPPublisher> {
         List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(EXCHANGE);
         properties.add(ROUTING_KEY);
+        properties.add(HEADERS_SOURCE);
+        properties.add(HEADERS_SOURCE_PRECEDENCE);
+        properties.add(HEADERS_ATTRIBUTES_REGEX);
         properties.add(HEADER_SEPARATOR);
         properties.addAll(getCommonPropertyDescriptors());
         propertyDescriptors = Collections.unmodifiableList(properties);
-
-        Set<Relationship> rels = new HashSet<>();
-        rels.add(REL_SUCCESS);
-        rels.add(REL_FAILURE);
-        relationships = Collections.unmodifiableSet(rels);
+        relationships = Set.of(REL_SUCCESS, REL_FAILURE);
     }
 
-
     /**
      * Will construct AMQP message by extracting its body from the incoming 
{@link FlowFile}. AMQP Properties will be extracted from the
      * {@link FlowFile} and converted to {@link BasicProperties} to be sent 
along with the message. Upon success the incoming {@link FlowFile} is
      * transferred to 'success' {@link Relationship} and upon failure FlowFile 
is penalized and transferred to the 'failure' {@link Relationship}
      * <br>
-     *
-     * NOTE: Attributes extracted from {@link FlowFile} are considered
-     * candidates for AMQP properties if their names are prefixed with
-     * {@link PublishAMQP#ATTRIBUTES_PREFIX} (e.g., amqp$contentType=text/xml)
+     * <p>
+     * NOTE: Attributes extracted from {@link FlowFile} are considered 
candidates for AMQP properties if their names are prefixed with
+     * "amq$" (e.g., amqp$contentType=text/xml). For "amqp$headers" it depends 
on the value of

Review Comment:
   ```suggestion
        * "amqp$" (e.g., amqp$contentType=text/xml). For "amqp$headers" it 
depends on the value of
   ```



##########
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##########
@@ -96,13 +96,44 @@ public class PublishAMQP extends 
AbstractAMQPProcessor<AMQPPublisher> {
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
-
+    public static final PropertyDescriptor HEADERS_SOURCE = new 
PropertyDescriptor.Builder()
+            .name("Headers Source")
+            .displayName("Headers Source")
+            .description(
+                    "The source of the headers which will be put in the 
published message. They can come either from the processor property as a string 
or they can be " +
+                            "picked from flow file attributes based on Regex 
expression.")
+            .required(true)
+            .allowableValues(InputHeaderSource.getAllowedValues())

Review Comment:
   ```suggestion
               .allowableValues(InputHeaderSource.class)
   ```



##########
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##########
@@ -268,4 +357,43 @@ private Map<String, Object> 
validateAMQPHeaderProperty(String amqpPropValue, Cha
         }
         return headers;
     }
+    public enum InputHeaderSource implements DescribedValue {
+
+        ATTRIBUTES("headersFromAttributes", "Attributes Matching Regex",
+            "Select attributes based on regex pattern to put in rabbitmq 
headers. Key of the attribute will be used as header key"),
+        STRING("headersFromString", "Attribute 'amp$headers' Value",
+            "Prepare headers from 'amp$headers' attribute string"),
+        BOTH("headersFromBoth", "Regex Match And 'amp$headers' Value",

Review Comment:
   The value of `BOTH` is confusing and would not translate well if some new 
value were introduced. It seems like it should be removed, along with the 
Precedence property.



##########
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##########
@@ -268,4 +357,43 @@ private Map<String, Object> 
validateAMQPHeaderProperty(String amqpPropValue, Cha
         }
         return headers;
     }
+    public enum InputHeaderSource implements DescribedValue {
+
+        ATTRIBUTES("headersFromAttributes", "Attributes Matching Regex",
+            "Select attributes based on regex pattern to put in rabbitmq 
headers. Key of the attribute will be used as header key"),
+        STRING("headersFromString", "Attribute 'amp$headers' Value",
+            "Prepare headers from 'amp$headers' attribute string"),
+        BOTH("headersFromBoth", "Regex Match And 'amp$headers' Value",
+            "Take headers from both sources: 'amp$headers' attribute and 
attributes matching Regex. In case of key duplication precedence property will 
define which value to take.");
+
+        private final String value;
+        private final String displayName;
+        private final String description;
+
+        InputHeaderSource(String value, String displayName, String 
description) {
+
+            this.value = value;
+            this.displayName = displayName;
+            this.description = description;
+        }
+
+        public static EnumSet<InputHeaderSource> getAllowedValues() {
+            return EnumSet.of(STRING, ATTRIBUTES, BOTH);
+        }
+
+        @Override
+        public String getValue() {
+            return value;

Review Comment:
   ```suggestion
               return name();
   ```



##########
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##########
@@ -268,4 +357,43 @@ private Map<String, Object> 
validateAMQPHeaderProperty(String amqpPropValue, Cha
         }
         return headers;
     }
+    public enum InputHeaderSource implements DescribedValue {
+
+        ATTRIBUTES("headersFromAttributes", "Attributes Matching Regex",
+            "Select attributes based on regex pattern to put in rabbitmq 
headers. Key of the attribute will be used as header key"),
+        STRING("headersFromString", "Attribute 'amp$headers' Value",
+            "Prepare headers from 'amp$headers' attribute string"),
+        BOTH("headersFromBoth", "Regex Match And 'amp$headers' Value",
+            "Take headers from both sources: 'amp$headers' attribute and 
attributes matching Regex. In case of key duplication precedence property will 
define which value to take.");
+
+        private final String value;
+        private final String displayName;
+        private final String description;
+
+        InputHeaderSource(String value, String displayName, String 
description) {

Review Comment:
   The `value` is unnecessary and the `name()` from the enum can be used 
instead.



##########
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##########
@@ -96,13 +96,44 @@ public class PublishAMQP extends 
AbstractAMQPProcessor<AMQPPublisher> {
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
-
+    public static final PropertyDescriptor HEADERS_SOURCE = new 
PropertyDescriptor.Builder()
+            .name("Headers Source")
+            .displayName("Headers Source")
+            .description(
+                    "The source of the headers which will be put in the 
published message. They can come either from the processor property as a string 
or they can be " +
+                            "picked from flow file attributes based on Regex 
expression.")
+            .required(true)
+            .allowableValues(InputHeaderSource.getAllowedValues())
+            .defaultValue(InputHeaderSource.STRING)
+            .build();
+    public static final PropertyDescriptor HEADERS_SOURCE_PRECEDENCE = new 
PropertyDescriptor.Builder()
+            .name("Headers Source Precedence")

Review Comment:
   It seems like it would be better to remove this property and just prefer one 
option or the other.



##########
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##########
@@ -206,46 +247,94 @@ private byte[] extractMessage(FlowFile flowFile, 
ProcessSession session) {
     }
 
 
-    private void updateBuilderFromAttribute(final FlowFile flowFile, final 
String attribute, final Consumer<String> updater) {
-        final String attributeValue = flowFile.getAttribute(ATTRIBUTES_PREFIX 
+ attribute);
+    /**
+     * Reads an attribute from flowFile and pass it to the consumer function
+     *
+     * @param flowFile        FlowFile for reading the attribute
+     * @param attributeKey    Name of the attribute
+     * @param updater         Consumer function which will use the attribute 
value
+     */
+    private void readAmqpAttribute(final FlowFile flowFile, final String 
attributeKey, final Consumer<String> updater) {
+        final String attributeValue = flowFile.getAttribute(attributeKey);
         if (attributeValue == null) {
             return;
         }
 
         try {
             updater.accept(attributeValue);
         } catch (final Exception e) {
-            getLogger().warn("Failed to update AMQP Message Property {}", 
attribute, e);
+            getLogger().warn("Failed to update AMQP Message Property [{}]", 
attributeKey, e);
         }
     }
 
     /**
      * Extracts AMQP properties from the {@link FlowFile} attributes. 
Attributes
      * extracted from {@link FlowFile} are considered candidates for AMQP
-     * properties if their names are prefixed with
-     * {@link PublishAMQP#ATTRIBUTES_PREFIX} (e.g., amqp$contentType=text/xml).
+     * properties if their names are prefixed with "amq$" (e.g., 
amqp$contentType=text/xml).
      */
-    private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile 
flowFile, Character headerSeparator) {
+    private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile 
flowFile, String selectedHeaderSource, String headerSourcePrecedence, Character 
separator, Pattern pattern) {
         final AMQP.BasicProperties.Builder builder = new 
AMQP.BasicProperties.Builder();
 
-        updateBuilderFromAttribute(flowFile, "contentType", 
builder::contentType);
-        updateBuilderFromAttribute(flowFile, "contentEncoding", 
builder::contentEncoding);
-        updateBuilderFromAttribute(flowFile, "deliveryMode", mode -> 
builder.deliveryMode(Integer.parseInt(mode)));
-        updateBuilderFromAttribute(flowFile, "priority", pri -> 
builder.priority(Integer.parseInt(pri)));
-        updateBuilderFromAttribute(flowFile, "correlationId", 
builder::correlationId);
-        updateBuilderFromAttribute(flowFile, "replyTo", builder::replyTo);
-        updateBuilderFromAttribute(flowFile, "expiration", 
builder::expiration);
-        updateBuilderFromAttribute(flowFile, "messageId", builder::messageId);
-        updateBuilderFromAttribute(flowFile, "timestamp", ts -> 
builder.timestamp(new Date(Long.parseLong(ts))));
-        updateBuilderFromAttribute(flowFile, "type", builder::type);
-        updateBuilderFromAttribute(flowFile, "userId", builder::userId);
-        updateBuilderFromAttribute(flowFile, "appId", builder::appId);
-        updateBuilderFromAttribute(flowFile, "clusterId", builder::clusterId);
-        updateBuilderFromAttribute(flowFile, "headers", headers -> 
builder.headers(validateAMQPHeaderProperty(headers, headerSeparator)));
+        readAmqpAttribute(flowFile, 
AbstractAMQPProcessor.AMQP_CONTENT_TYPE_ATTRIBUTE, builder::contentType);
+        readAmqpAttribute(flowFile, 
AbstractAMQPProcessor.AMQP_CONTENT_ENCODING_ATTRIBUTE, 
builder::contentEncoding);
+        readAmqpAttribute(flowFile, 
AbstractAMQPProcessor.AMQP_DELIVERY_MODE_ATTRIBUTE, mode -> 
builder.deliveryMode(Integer.parseInt(mode)));
+        readAmqpAttribute(flowFile, 
AbstractAMQPProcessor.AMQP_PRIORITY_ATTRIBUTE, pri -> 
builder.priority(Integer.parseInt(pri)));
+        readAmqpAttribute(flowFile, 
AbstractAMQPProcessor.AMQP_CORRELATION_ID_ATTRIBUTE, builder::correlationId);
+        readAmqpAttribute(flowFile, 
AbstractAMQPProcessor.AMQP_REPLY_TO_ATTRIBUTE, builder::replyTo);
+        readAmqpAttribute(flowFile, 
AbstractAMQPProcessor.AMQP_EXPIRATION_ATTRIBUTE, builder::expiration);
+        readAmqpAttribute(flowFile, 
AbstractAMQPProcessor.AMQP_MESSAGE_ID_ATTRIBUTE, builder::messageId);
+        readAmqpAttribute(flowFile, 
AbstractAMQPProcessor.AMQP_TIMESTAMP_ATTRIBUTE, ts -> builder.timestamp(new 
Date(Long.parseLong(ts))));
+        readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_TYPE_ATTRIBUTE, 
builder::type);
+        readAmqpAttribute(flowFile, 
AbstractAMQPProcessor.AMQP_USER_ID_ATTRIBUTE, builder::userId);
+        readAmqpAttribute(flowFile, 
AbstractAMQPProcessor.AMQP_APPID_ATTRIBUTE, builder::appId);
+        readAmqpAttribute(flowFile, 
AbstractAMQPProcessor.AMQP_CLUSTER_ID_ATTRIBUTE, builder::clusterId);
+
+        Map<String, Object> headers = prepareAMQPHeaders(flowFile, 
selectedHeaderSource, headerSourcePrecedence, separator, pattern);
+        builder.headers(headers);
 
         return builder.build();
     }
 
+    /**
+     * Extract AMQP headers from incoming {@link FlowFile} based on selected 
headers source value.
+     *
+     * @param flowFile used to extract headers
+     * @return {@link Map}
+     */
+    private Map<String, Object> prepareAMQPHeaders(FlowFile flowFile, String 
selectedHeaderSource, String headerSourcePrecedence, Character headerSeparator, 
Pattern pattern) {
+        final Map<String, Object> headers = new HashMap<>();
+        if 
(InputHeaderSource.ATTRIBUTES.getValue().equals(selectedHeaderSource)) {
+                headers.putAll(attributesToHeaders(flowFile.getAttributes(), 
pattern));
+        } else if 
(InputHeaderSource.STRING.getValue().equals(selectedHeaderSource)) {
+            readAmqpAttribute(flowFile, 
AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE, value -> 
headers.putAll(validateAMQPHeaderProperty(value, headerSeparator)));
+        } else {
+            // When precedence matches, put values in the last so it can 
override keys from other source
+            if 
(InputHeaderSource.ATTRIBUTES.getValue().equals(headerSourcePrecedence)) {
+                readAmqpAttribute(flowFile, 
AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE, value -> 
headers.putAll(validateAMQPHeaderProperty(value, headerSeparator)));
+                headers.putAll(attributesToHeaders(flowFile.getAttributes(), 
pattern));
+            } else {
+                headers.putAll(attributesToHeaders(flowFile.getAttributes(), 
pattern));
+                readAmqpAttribute(flowFile, 
AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE, value -> 
headers.putAll(validateAMQPHeaderProperty(value, headerSeparator)));
+            }
+        }
+        return headers;
+    }
+
+    /**
+     * Matches the pattern to keys of input attributes and output the amqp 
headers map
+     * @param attributes flowFile attributes to scan for match
+     * @return Map with entries matching the pattern
+     */
+    private Map<String, String> attributesToHeaders(Map<String, String> 
attributes, Pattern pattern) {

Review Comment:
   ```suggestion
       private Map<String, String> getMatchedAttributes(final Map<String, 
String> attributes, final Pattern pattern) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to