Re: [PR] NIFI-12411: Update PublishAMQP to read AMQP headers value from FlowFile attributes and `amq$headers` string [nifi]

2024-07-20 Thread via GitHub


umarhussain15 commented on PR #8105:
URL: https://github.com/apache/nifi/pull/8105#issuecomment-2241299314

   Hi @exceptionfactory,
   I have applied your suggestions and also removed the precedence option from 
the processor. 
   I have also updated the additional documentation to describe the headers 
source setting of the processor.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-12411: Update PublishAMQP to read AMQP headers value from FlowFile attributes and `amq$headers` string [nifi]

2024-07-20 Thread via GitHub


umarhussain15 commented on code in PR #8105:
URL: https://github.com/apache/nifi/pull/8105#discussion_r1685537813


##
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##
@@ -96,13 +96,44 @@ public class PublishAMQP extends 
AbstractAMQPProcessor {
 
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
 .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
 .build();
-
+public static final PropertyDescriptor HEADERS_SOURCE = new 
PropertyDescriptor.Builder()
+.name("Headers Source")
+.displayName("Headers Source")
+.description(
+"The source of the headers which will be put in the 
published message. They can come either from the processor property as a string 
or they can be " +
+"picked from flow file attributes based on Regex 
expression.")
+.required(true)
+.allowableValues(InputHeaderSource.getAllowedValues())
+.defaultValue(InputHeaderSource.STRING)
+.build();
+public static final PropertyDescriptor HEADERS_SOURCE_PRECEDENCE = new 
PropertyDescriptor.Builder()
+.name("Headers Source Precedence")

Review Comment:
   I have removed this option now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-12411: Update PublishAMQP to read AMQP headers value from FlowFile attributes and `amq$headers` string [nifi]

2024-07-08 Thread via GitHub


exceptionfactory commented on code in PR #8105:
URL: https://github.com/apache/nifi/pull/8105#discussion_r1669267566


##
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##
@@ -85,9 +85,9 @@ public class PublishAMQP extends 
AbstractAMQPProcessor {
 
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
 .addValidator(Validator.VALID)
 .build();
-
 public static final PropertyDescriptor ROUTING_KEY = new 
PropertyDescriptor.Builder()
 .name("Routing Key")
+.displayName("Routing Key")

Review Comment:
   ```suggestion
   ```



##
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##
@@ -206,46 +247,94 @@ private byte[] extractMessage(FlowFile flowFile, 
ProcessSession session) {
 }
 
 
-private void updateBuilderFromAttribute(final FlowFile flowFile, final 
String attribute, final Consumer updater) {
-final String attributeValue = flowFile.getAttribute(ATTRIBUTES_PREFIX 
+ attribute);
+/**
+ * Reads an attribute from flowFile and pass it to the consumer function
+ *
+ * @param flowFileFlowFile for reading the attribute
+ * @param attributeKeyName of the attribute
+ * @param updater Consumer function which will use the attribute 
value
+ */
+private void readAmqpAttribute(final FlowFile flowFile, final String 
attributeKey, final Consumer updater) {
+final String attributeValue = flowFile.getAttribute(attributeKey);
 if (attributeValue == null) {
 return;
 }
 
 try {
 updater.accept(attributeValue);
 } catch (final Exception e) {
-getLogger().warn("Failed to update AMQP Message Property {}", 
attribute, e);
+getLogger().warn("Failed to update AMQP Message Property [{}]", 
attributeKey, e);
 }
 }
 
 /**
  * Extracts AMQP properties from the {@link FlowFile} attributes. 
Attributes
  * extracted from {@link FlowFile} are considered candidates for AMQP
- * properties if their names are prefixed with
- * {@link PublishAMQP#ATTRIBUTES_PREFIX} (e.g., amqp$contentType=text/xml).
+ * properties if their names are prefixed with "amq$" (e.g., 
amqp$contentType=text/xml).
  */
-private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile 
flowFile, Character headerSeparator) {
+private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile 
flowFile, String selectedHeaderSource, String headerSourcePrecedence, Character 
separator, Pattern pattern) {
 final AMQP.BasicProperties.Builder builder = new 
AMQP.BasicProperties.Builder();
 
-updateBuilderFromAttribute(flowFile, "contentType", 
builder::contentType);
-updateBuilderFromAttribute(flowFile, "contentEncoding", 
builder::contentEncoding);
-updateBuilderFromAttribute(flowFile, "deliveryMode", mode -> 
builder.deliveryMode(Integer.parseInt(mode)));
-updateBuilderFromAttribute(flowFile, "priority", pri -> 
builder.priority(Integer.parseInt(pri)));
-updateBuilderFromAttribute(flowFile, "correlationId", 
builder::correlationId);
-updateBuilderFromAttribute(flowFile, "replyTo", builder::replyTo);
-updateBuilderFromAttribute(flowFile, "expiration", 
builder::expiration);
-updateBuilderFromAttribute(flowFile, "messageId", builder::messageId);
-updateBuilderFromAttribute(flowFile, "timestamp", ts -> 
builder.timestamp(new Date(Long.parseLong(ts;
-updateBuilderFromAttribute(flowFile, "type", builder::type);
-updateBuilderFromAttribute(flowFile, "userId", builder::userId);
-updateBuilderFromAttribute(flowFile, "appId", builder::appId);
-updateBuilderFromAttribute(flowFile, "clusterId", builder::clusterId);
-updateBuilderFromAttribute(flowFile, "headers", headers -> 
builder.headers(validateAMQPHeaderProperty(headers, headerSeparator)));
+readAmqpAttribute(flowFile, 
AbstractAMQPProcessor.AMQP_CONTENT_TYPE_ATTRIBUTE, builder::contentType);
+readAmqpAttribute(flowFile, 
AbstractAMQPProcessor.AMQP_CONTENT_ENCODING_ATTRIBUTE, 
builder::contentEncoding);
+readAmqpAttribute(flowFile, 
AbstractAMQPProcessor.AMQP_DELIVERY_MODE_ATTRIBUTE, mode -> 
builder.deliveryMode(Integer.parseInt(mode)));
+readAmqpAttribute(flowFile, 
AbstractAMQPProcessor.AMQP_PRIORITY_ATTRIBUTE, pri -> 
builder.priority(Integer.parseInt(pri)));
+readAmqpAttribute(flowFile, 
AbstractAMQPProcessor.AMQP_CORRELATION_ID_ATTRIBUTE, builder::correlationId);
+readAmqpAttribute(flowFile, 
AbstractAMQPProcessor.AMQP_REPLY_TO_ATTRIBUTE, builder::replyTo);
+readAmqpAttribute(flowFile, 
AbstractAMQPProcessor.AMQP_EXPIRATION_ATTRIBUTE, builder::expiration);
+

Re: [PR] NIFI-12411: Update PublishAMQP to read AMQP headers value from FlowFile attributes and `amq$headers` string [nifi]

2024-05-21 Thread via GitHub


umarhussain15 commented on PR #8105:
URL: https://github.com/apache/nifi/pull/8105#issuecomment-2122142231

   Checkstyle reported issues fixed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-12411: Update PublishAMQP to read AMQP headers value from FlowFile attributes and `amq$headers` string [nifi]

2024-05-17 Thread via GitHub


umarhussain15 commented on PR #8105:
URL: https://github.com/apache/nifi/pull/8105#issuecomment-2117684394

   Reverted the AbstractAMQPProcessor change to support expression language in 
password field.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-12411: Update PublishAMQP to read AMQP headers value from FlowFile attributes and `amq$headers` string [nifi]

2024-05-17 Thread via GitHub


umarhussain15 commented on PR #8105:
URL: https://github.com/apache/nifi/pull/8105#issuecomment-2117511390

   Thanks for the explanation, @exceptionfactory. I will revert the change here 
then.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-12411: Update PublishAMQP to read AMQP headers value from FlowFile attributes and `amq$headers` string [nifi]

2024-05-14 Thread via GitHub


exceptionfactory commented on PR #8105:
URL: https://github.com/apache/nifi/pull/8105#issuecomment-2111054120

   > @exceptionfactory @joewitt
   > 
   > I have added one more change, which will allow users to set amqp 
processor's password via env variables. I think this will be helpful in cases 
where we use AMQP processors in multiple NiFI instances. Instead of copying and 
pasting different passwords in processor properties, we can reference the env 
variable via its name. Let me know your thoughts on it. Thanks.
   
   Thanks for calling out this particular change @umarhussain15. Given the 
scope of the current pull request, I recommend considering that change 
separately.
   
   In general, sensitive properties such as passwords do not support reading 
from environment variables. The reason is that environment variables are also 
accessible through Expression Language, so they are not necessarily secure. The 
use case is still supported through Parameter Contexts and Parameter Providers, 
where the Environment Variable Parameter Provider can be used. This is an 
additional layer of indirection, but it follows the general convention for 
other sensitive properties.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-12411: Update PublishAMQP to read AMQP headers value from FlowFile attributes and `amq$headers` string [nifi]

2024-05-14 Thread via GitHub


umarhussain15 commented on PR #8105:
URL: https://github.com/apache/nifi/pull/8105#issuecomment-2110984083

   @exceptionfactory @joewitt 
   
   I have added one more change, which will allow users to set amqp processor's 
password via env variables.
   I think this will be helpful in cases where we use AMQP processors in 
multiple NiFI instances. Instead of copying and pasting different passwords in 
processor properties, we can reference the env variable via its name.
   Let me know your thoughts on it.
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-12411: Update PublishAMQP to read AMQP headers value from FlowFile attributes and `amq$headers` string [nifi]

2024-05-12 Thread via GitHub


umarhussain15 commented on PR #8105:
URL: https://github.com/apache/nifi/pull/8105#issuecomment-2106217932

   Hi @joewitt @exceptionfactory,
   I have reverted the changes of making class level variables based on 
processor properties. 
   Also, the branch is rebased on latest changes from main


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-12411: Update PublishAMQP to read AMQP headers value from FlowFile attributes and `amq$headers` string [nifi]

2024-05-04 Thread via GitHub


joewitt commented on PR #8105:
URL: https://github.com/apache/nifi/pull/8105#issuecomment-2094426305

   @umarhussain15 Can you please rebase and consider the feedback shared from 
@exceptionfactory ?
   
   Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-12411: Update PublishAMQP to read AMQP headers value from FlowFile attributes and `amq$headers` string [nifi]

2023-12-27 Thread via GitHub


umarhussain15 commented on code in PR #8105:
URL: https://github.com/apache/nifi/pull/8105#discussion_r1437011582


##
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java:
##
@@ -191,6 +204,19 @@ public class ConsumeAMQP extends 
AbstractAMQPProcessor {
 objectMapper = new ObjectMapper();
 }
 
+@OnScheduled
+public void onScheduled(ProcessContext context) {
+super.onScheduled(context);
+batchSize = context.getProperty(BATCH_SIZE).asInteger();
+queueName = context.getProperty(QUEUE).getValue();
+headerFormat = context.getProperty(HEADER_FORMAT).getValue();
+headerAttributePrefix = 
context.getProperty(HEADER_KEY_PREFIX).getValue();
+removeCurlyBraces=context.getProperty(REMOVE_CURLY_BRACES).asBoolean();
+valueSeparatorForHeaders = 
context.getProperty(HEADER_SEPARATOR).getValue();
+autoAcknowledge = context.getProperty(AUTO_ACKNOWLEDGE).asBoolean();
+prefetchCount =  context.getProperty(PREFETCH_COUNT).asInteger();

Review Comment:
   I moved these to class level variables since they are only evaluated when 
processor is scheduled and will not change during `onTrigger`. Most of them 
don't have `ExpressionLanguageScope.FLOWFILE_ATTRIBUTES` or only have 
`ExpressionLanguageScope.ENVIRONMENT`.
   But I can move them back to be method local, if they don't provide any 
benefit performance wise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-12411: Update PublishAMQP to read AMQP headers value from FlowFile attributes and `amq$headers` string [nifi]

2023-12-27 Thread via GitHub


umarhussain15 commented on code in PR #8105:
URL: https://github.com/apache/nifi/pull/8105#discussion_r1437008408


##
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##
@@ -58,36 +59,42 @@
 + "that happens you will see a log in both app-log and bulletin stating to 
that effect, and the FlowFile will be routed to the 'failure' relationship.")
 @SystemResourceConsideration(resource = SystemResource.MEMORY)
 @ReadsAttributes({
-@ReadsAttribute(attribute = "amqp$appId", description = "The App ID field 
to set on the AMQP Message"),
-@ReadsAttribute(attribute = "amqp$contentEncoding", description = "The 
Content Encoding to set on the AMQP Message"),
-@ReadsAttribute(attribute = "amqp$contentType", description = "The Content 
Type to set on the AMQP Message"),
-@ReadsAttribute(attribute = "amqp$headers", description = "The headers to 
set on the AMQP Message"),
-@ReadsAttribute(attribute = "amqp$deliveryMode", description = "The 
numeric indicator for the Message's Delivery Mode"),
-@ReadsAttribute(attribute = "amqp$priority", description = "The Message 
priority"),
-@ReadsAttribute(attribute = "amqp$correlationId", description = "The 
Message's Correlation ID"),
-@ReadsAttribute(attribute = "amqp$replyTo", description = "The value of 
the Message's Reply-To field"),
-@ReadsAttribute(attribute = "amqp$expiration", description = "The Message 
Expiration"),
-@ReadsAttribute(attribute = "amqp$messageId", description = "The unique ID 
of the Message"),
-@ReadsAttribute(attribute = "amqp$timestamp", description = "The timestamp 
of the Message, as the number of milliseconds since epoch"),
-@ReadsAttribute(attribute = "amqp$type", description = "The type of 
message"),
-@ReadsAttribute(attribute = "amqp$userId", description = "The ID of the 
user"),
-@ReadsAttribute(attribute = "amqp$clusterId", description = "The ID of the 
AMQP Cluster"),
+@ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_APPID_ATTRIBUTE, 
description = "The App ID field to set on the AMQP Message"),
+@ReadsAttribute(attribute = 
AbstractAMQPProcessor.AMQP_CONTENT_ENCODING_ATTRIBUTE, description = "The 
Content Encoding to set on the AMQP Message"),
+@ReadsAttribute(attribute = 
AbstractAMQPProcessor.AMQP_CONTENT_TYPE_ATTRIBUTE, description = "The Content 
Type to set on the AMQP Message"),
+@ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE, 
description = "The headers to set on the AMQP Message"),
+@ReadsAttribute(attribute = 
AbstractAMQPProcessor.AMQP_DELIVERY_MODE_ATTRIBUTE, description = "The numeric 
indicator for the Message's Delivery Mode"),
+@ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_PRIORITY_ATTRIBUTE, 
description = "The Message priority"),
+@ReadsAttribute(attribute = 
AbstractAMQPProcessor.AMQP_CORRELATION_ID_ATTRIBUTE, description = "The 
Message's Correlation ID"),
+@ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_REPLY_TO_ATTRIBUTE, 
description = "The value of the Message's Reply-To field"),
+@ReadsAttribute(attribute = 
AbstractAMQPProcessor.AMQP_EXPIRATION_ATTRIBUTE, description = "The Message 
Expiration"),
+@ReadsAttribute(attribute = 
AbstractAMQPProcessor.AMQP_MESSAGE_ID_ATTRIBUTE, description = "The unique ID 
of the Message"),
+@ReadsAttribute(attribute = 
AbstractAMQPProcessor.AMQP_TIMESTAMP_ATTRIBUTE, description = "The timestamp of 
the Message, as the number of milliseconds since epoch"),
+@ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_TYPE_ATTRIBUTE, 
description = "The type of message"),
+@ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_USER_ID_ATTRIBUTE, 
description = "The ID of the user"),
+@ReadsAttribute(attribute = 
AbstractAMQPProcessor.AMQP_CLUSTER_ID_ATTRIBUTE, description = "The ID of the 
AMQP Cluster"),
 })
 public class PublishAMQP extends AbstractAMQPProcessor {
-private static final String ATTRIBUTES_PREFIX = "amqp$";
 
+public static final AllowableValue HEADERS_FROM_ATTRIBUTES = new 
AllowableValue("headersFromAttributes", "Attributes Matching Regex",
+"Select attributes based on regex pattern to put in rabbitmq 
headers. Key of the attribute will be used as header key");
+public static final AllowableValue HEADERS_FROM_STRING = new 
AllowableValue("headersFromString", "Attribute 'amp$headers' Value",
+"Prepare headers from 'amp$headers' attribute string");
+public static final AllowableValue HEADERS_FROM_BOTH = new 
AllowableValue("headersFromBoth", "Regex Match And 'amp$headers' Value",
+"Take headers from both sources: 'amp$headers' attribute and 
attributes matching Regex. In case of key duplication precedence property will 
define which value to take.");
 public static final PropertyDescriptor EXCHANGE = new 
PropertyDescriptor.Builder()
-.name("Exchange Name")
+ 

Re: [PR] NIFI-12411: Update PublishAMQP to read AMQP headers value from FlowFile attributes and `amq$headers` string [nifi]

2023-12-26 Thread via GitHub


exceptionfactory commented on code in PR #8105:
URL: https://github.com/apache/nifi/pull/8105#discussion_r1436675089


##
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java:
##
@@ -191,6 +204,19 @@ public class ConsumeAMQP extends 
AbstractAMQPProcessor {
 objectMapper = new ObjectMapper();
 }
 
+@OnScheduled
+public void onScheduled(ProcessContext context) {
+super.onScheduled(context);
+batchSize = context.getProperty(BATCH_SIZE).asInteger();
+queueName = context.getProperty(QUEUE).getValue();
+headerFormat = context.getProperty(HEADER_FORMAT).getValue();
+headerAttributePrefix = 
context.getProperty(HEADER_KEY_PREFIX).getValue();
+removeCurlyBraces=context.getProperty(REMOVE_CURLY_BRACES).asBoolean();
+valueSeparatorForHeaders = 
context.getProperty(HEADER_SEPARATOR).getValue();
+autoAcknowledge = context.getProperty(AUTO_ACKNOWLEDGE).asBoolean();
+prefetchCount =  context.getProperty(PREFETCH_COUNT).asInteger();

Review Comment:
   Is there a particular reason for moving these properties to class member 
variables? In general, it is best to keep that as method-local variables unless 
there is some other reason to move them around.



##
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##
@@ -121,30 +159,48 @@ public class PublishAMQP extends 
AbstractAMQPProcessor {
 
 private final static Set relationships;
 
+private String selectedHeaderSource;
+private String headerSourcePrecedence;
+private Character headerSeparator;

Review Comment:
   See note on relocating property values.



##
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##
@@ -58,36 +59,42 @@
 + "that happens you will see a log in both app-log and bulletin stating to 
that effect, and the FlowFile will be routed to the 'failure' relationship.")
 @SystemResourceConsideration(resource = SystemResource.MEMORY)
 @ReadsAttributes({
-@ReadsAttribute(attribute = "amqp$appId", description = "The App ID field 
to set on the AMQP Message"),
-@ReadsAttribute(attribute = "amqp$contentEncoding", description = "The 
Content Encoding to set on the AMQP Message"),
-@ReadsAttribute(attribute = "amqp$contentType", description = "The Content 
Type to set on the AMQP Message"),
-@ReadsAttribute(attribute = "amqp$headers", description = "The headers to 
set on the AMQP Message"),
-@ReadsAttribute(attribute = "amqp$deliveryMode", description = "The 
numeric indicator for the Message's Delivery Mode"),
-@ReadsAttribute(attribute = "amqp$priority", description = "The Message 
priority"),
-@ReadsAttribute(attribute = "amqp$correlationId", description = "The 
Message's Correlation ID"),
-@ReadsAttribute(attribute = "amqp$replyTo", description = "The value of 
the Message's Reply-To field"),
-@ReadsAttribute(attribute = "amqp$expiration", description = "The Message 
Expiration"),
-@ReadsAttribute(attribute = "amqp$messageId", description = "The unique ID 
of the Message"),
-@ReadsAttribute(attribute = "amqp$timestamp", description = "The timestamp 
of the Message, as the number of milliseconds since epoch"),
-@ReadsAttribute(attribute = "amqp$type", description = "The type of 
message"),
-@ReadsAttribute(attribute = "amqp$userId", description = "The ID of the 
user"),
-@ReadsAttribute(attribute = "amqp$clusterId", description = "The ID of the 
AMQP Cluster"),
+@ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_APPID_ATTRIBUTE, 
description = "The App ID field to set on the AMQP Message"),
+@ReadsAttribute(attribute = 
AbstractAMQPProcessor.AMQP_CONTENT_ENCODING_ATTRIBUTE, description = "The 
Content Encoding to set on the AMQP Message"),
+@ReadsAttribute(attribute = 
AbstractAMQPProcessor.AMQP_CONTENT_TYPE_ATTRIBUTE, description = "The Content 
Type to set on the AMQP Message"),
+@ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE, 
description = "The headers to set on the AMQP Message"),
+@ReadsAttribute(attribute = 
AbstractAMQPProcessor.AMQP_DELIVERY_MODE_ATTRIBUTE, description = "The numeric 
indicator for the Message's Delivery Mode"),
+@ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_PRIORITY_ATTRIBUTE, 
description = "The Message priority"),
+@ReadsAttribute(attribute = 
AbstractAMQPProcessor.AMQP_CORRELATION_ID_ATTRIBUTE, description = "The 
Message's Correlation ID"),
+@ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_REPLY_TO_ATTRIBUTE, 
description = "The value of the Message's Reply-To field"),
+@ReadsAttribute(attribute = 
AbstractAMQPProcessor.AMQP_EXPIRATION_ATTRIBUTE, description = "The Message 
Expiration"),
+@ReadsAttribute(attribute =