exceptionfactory commented on code in PR #8105: URL: https://github.com/apache/nifi/pull/8105#discussion_r1436675089
########## nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java: ########## @@ -191,6 +204,19 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> { objectMapper = new ObjectMapper(); } + @OnScheduled + public void onScheduled(ProcessContext context) { + super.onScheduled(context); + batchSize = context.getProperty(BATCH_SIZE).asInteger(); + queueName = context.getProperty(QUEUE).getValue(); + headerFormat = context.getProperty(HEADER_FORMAT).getValue(); + headerAttributePrefix = context.getProperty(HEADER_KEY_PREFIX).getValue(); + removeCurlyBraces=context.getProperty(REMOVE_CURLY_BRACES).asBoolean(); + valueSeparatorForHeaders = context.getProperty(HEADER_SEPARATOR).getValue(); + autoAcknowledge = context.getProperty(AUTO_ACKNOWLEDGE).asBoolean(); + prefetchCount = context.getProperty(PREFETCH_COUNT).asInteger(); Review Comment: Is there a particular reason for moving these properties to class member variables? In general, it is best to keep that as method-local variables unless there is some other reason to move them around. ########## nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java: ########## @@ -121,30 +159,48 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> { private final static Set<Relationship> relationships; + private String selectedHeaderSource; + private String headerSourcePrecedence; + private Character headerSeparator; Review Comment: See note on relocating property values. ########## nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java: ########## @@ -58,36 +59,42 @@ + "that happens you will see a log in both app-log and bulletin stating to that effect, and the FlowFile will be routed to the 'failure' relationship.") @SystemResourceConsideration(resource = SystemResource.MEMORY) @ReadsAttributes({ - @ReadsAttribute(attribute = "amqp$appId", description = "The App ID field to set on the AMQP Message"), - @ReadsAttribute(attribute = "amqp$contentEncoding", description = "The Content Encoding to set on the AMQP Message"), - @ReadsAttribute(attribute = "amqp$contentType", description = "The Content Type to set on the AMQP Message"), - @ReadsAttribute(attribute = "amqp$headers", description = "The headers to set on the AMQP Message"), - @ReadsAttribute(attribute = "amqp$deliveryMode", description = "The numeric indicator for the Message's Delivery Mode"), - @ReadsAttribute(attribute = "amqp$priority", description = "The Message priority"), - @ReadsAttribute(attribute = "amqp$correlationId", description = "The Message's Correlation ID"), - @ReadsAttribute(attribute = "amqp$replyTo", description = "The value of the Message's Reply-To field"), - @ReadsAttribute(attribute = "amqp$expiration", description = "The Message Expiration"), - @ReadsAttribute(attribute = "amqp$messageId", description = "The unique ID of the Message"), - @ReadsAttribute(attribute = "amqp$timestamp", description = "The timestamp of the Message, as the number of milliseconds since epoch"), - @ReadsAttribute(attribute = "amqp$type", description = "The type of message"), - @ReadsAttribute(attribute = "amqp$userId", description = "The ID of the user"), - @ReadsAttribute(attribute = "amqp$clusterId", description = "The ID of the AMQP Cluster"), + @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_APPID_ATTRIBUTE, description = "The App ID field to set on the AMQP Message"), + @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_CONTENT_ENCODING_ATTRIBUTE, description = "The Content Encoding to set on the AMQP Message"), + @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_CONTENT_TYPE_ATTRIBUTE, description = "The Content Type to set on the AMQP Message"), + @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE, description = "The headers to set on the AMQP Message"), + @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_DELIVERY_MODE_ATTRIBUTE, description = "The numeric indicator for the Message's Delivery Mode"), + @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_PRIORITY_ATTRIBUTE, description = "The Message priority"), + @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_CORRELATION_ID_ATTRIBUTE, description = "The Message's Correlation ID"), + @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_REPLY_TO_ATTRIBUTE, description = "The value of the Message's Reply-To field"), + @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_EXPIRATION_ATTRIBUTE, description = "The Message Expiration"), + @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_MESSAGE_ID_ATTRIBUTE, description = "The unique ID of the Message"), + @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_TIMESTAMP_ATTRIBUTE, description = "The timestamp of the Message, as the number of milliseconds since epoch"), + @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_TYPE_ATTRIBUTE, description = "The type of message"), + @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_USER_ID_ATTRIBUTE, description = "The ID of the user"), + @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_CLUSTER_ID_ATTRIBUTE, description = "The ID of the AMQP Cluster"), }) public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> { - private static final String ATTRIBUTES_PREFIX = "amqp$"; + public static final AllowableValue HEADERS_FROM_ATTRIBUTES = new AllowableValue("headersFromAttributes", "Attributes Matching Regex", + "Select attributes based on regex pattern to put in rabbitmq headers. Key of the attribute will be used as header key"); + public static final AllowableValue HEADERS_FROM_STRING = new AllowableValue("headersFromString", "Attribute 'amp$headers' Value", + "Prepare headers from 'amp$headers' attribute string"); + public static final AllowableValue HEADERS_FROM_BOTH = new AllowableValue("headersFromBoth", "Regex Match And 'amp$headers' Value", + "Take headers from both sources: 'amp$headers' attribute and attributes matching Regex. In case of key duplication precedence property will define which value to take."); public static final PropertyDescriptor EXCHANGE = new PropertyDescriptor.Builder() - .name("Exchange Name") + .name("exchange.name") + .displayName("Exchange Name") .description("The name of the AMQP Exchange the messages will be sent to. Usually provided by the AMQP administrator (e.g., 'amq.direct'). " + "It is an optional property. If kept empty the messages will be sent to a default AMQP exchange.") .required(true) .defaultValue("") .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(Validator.VALID) .build(); - public static final PropertyDescriptor ROUTING_KEY = new PropertyDescriptor.Builder() - .name("Routing Key") + .name("routing.key") Review Comment: Property Names cannot be changed with introducing an automated migration method, or following a deprecation process. In general, the display name and property name should match for new properties, so these property name changes should be reverted. ########## nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java: ########## @@ -58,36 +59,42 @@ + "that happens you will see a log in both app-log and bulletin stating to that effect, and the FlowFile will be routed to the 'failure' relationship.") @SystemResourceConsideration(resource = SystemResource.MEMORY) @ReadsAttributes({ - @ReadsAttribute(attribute = "amqp$appId", description = "The App ID field to set on the AMQP Message"), - @ReadsAttribute(attribute = "amqp$contentEncoding", description = "The Content Encoding to set on the AMQP Message"), - @ReadsAttribute(attribute = "amqp$contentType", description = "The Content Type to set on the AMQP Message"), - @ReadsAttribute(attribute = "amqp$headers", description = "The headers to set on the AMQP Message"), - @ReadsAttribute(attribute = "amqp$deliveryMode", description = "The numeric indicator for the Message's Delivery Mode"), - @ReadsAttribute(attribute = "amqp$priority", description = "The Message priority"), - @ReadsAttribute(attribute = "amqp$correlationId", description = "The Message's Correlation ID"), - @ReadsAttribute(attribute = "amqp$replyTo", description = "The value of the Message's Reply-To field"), - @ReadsAttribute(attribute = "amqp$expiration", description = "The Message Expiration"), - @ReadsAttribute(attribute = "amqp$messageId", description = "The unique ID of the Message"), - @ReadsAttribute(attribute = "amqp$timestamp", description = "The timestamp of the Message, as the number of milliseconds since epoch"), - @ReadsAttribute(attribute = "amqp$type", description = "The type of message"), - @ReadsAttribute(attribute = "amqp$userId", description = "The ID of the user"), - @ReadsAttribute(attribute = "amqp$clusterId", description = "The ID of the AMQP Cluster"), + @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_APPID_ATTRIBUTE, description = "The App ID field to set on the AMQP Message"), + @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_CONTENT_ENCODING_ATTRIBUTE, description = "The Content Encoding to set on the AMQP Message"), + @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_CONTENT_TYPE_ATTRIBUTE, description = "The Content Type to set on the AMQP Message"), + @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE, description = "The headers to set on the AMQP Message"), + @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_DELIVERY_MODE_ATTRIBUTE, description = "The numeric indicator for the Message's Delivery Mode"), + @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_PRIORITY_ATTRIBUTE, description = "The Message priority"), + @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_CORRELATION_ID_ATTRIBUTE, description = "The Message's Correlation ID"), + @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_REPLY_TO_ATTRIBUTE, description = "The value of the Message's Reply-To field"), + @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_EXPIRATION_ATTRIBUTE, description = "The Message Expiration"), + @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_MESSAGE_ID_ATTRIBUTE, description = "The unique ID of the Message"), + @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_TIMESTAMP_ATTRIBUTE, description = "The timestamp of the Message, as the number of milliseconds since epoch"), + @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_TYPE_ATTRIBUTE, description = "The type of message"), + @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_USER_ID_ATTRIBUTE, description = "The ID of the user"), + @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_CLUSTER_ID_ATTRIBUTE, description = "The ID of the AMQP Cluster"), }) public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> { - private static final String ATTRIBUTES_PREFIX = "amqp$"; + public static final AllowableValue HEADERS_FROM_ATTRIBUTES = new AllowableValue("headersFromAttributes", "Attributes Matching Regex", + "Select attributes based on regex pattern to put in rabbitmq headers. Key of the attribute will be used as header key"); + public static final AllowableValue HEADERS_FROM_STRING = new AllowableValue("headersFromString", "Attribute 'amp$headers' Value", + "Prepare headers from 'amp$headers' attribute string"); + public static final AllowableValue HEADERS_FROM_BOTH = new AllowableValue("headersFromBoth", "Regex Match And 'amp$headers' Value", + "Take headers from both sources: 'amp$headers' attribute and attributes matching Regex. In case of key duplication precedence property will define which value to take."); Review Comment: Instead of declaring instances of `AllowableValue`, it would be helpful to define an enum of options that implemented `DescribedValue`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org