[ https://issues.apache.org/jira/browse/NIFI-1686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15213542#comment-15213542 ]
ASF GitHub Bot commented on NIFI-1686: -------------------------------------- Github user petmit commented on a diff in the pull request: https://github.com/apache/nifi/pull/305#discussion_r57531722 --- Diff: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java --- @@ -195,29 +193,85 @@ public void process(final InputStream in) throws IOException { * Extracts AMQP properties from the {@link FlowFile} attributes. Attributes * extracted from {@link FlowFile} are considered candidates for AMQP * properties if their names are prefixed with - * {@link AMQPUtils#AMQP_PROP_PREFIX} (e.g., amqp$contentType=text/xml) + * {@link AMQPUtils#AMQP_PROP_PREFIX} (e.g., amqp$contentType=text/xml). + * The header property is an exception and requires a {@link Map}, so should + * be passed in the following format: amqp$headers=key$value$key$value etc. */ private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile flowFile) { Map<String, String> attributes = flowFile.getAttributes(); AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); for (Entry<String, String> attributeEntry : attributes.entrySet()) { if (attributeEntry.getKey().startsWith(AMQPUtils.AMQP_PROP_PREFIX)) { - String amqpPropName = attributeEntry.getKey().split("\\" + AMQPUtils.AMQP_PROP_DELIMITER)[1]; + String amqpPropName = attributeEntry.getKey(); String amqpPropValue = attributeEntry.getValue(); - try { - if (amqpPropertyNames.contains(AMQPUtils.AMQP_PROP_PREFIX + amqpPropName)) { - Method m = builder.getClass().getDeclaredMethod(amqpPropName, String.class); - m.invoke(builder, amqpPropValue); - } else { - getLogger().warn("Unrecogninsed AMQP property '" + amqpPropName + "', will ignore."); + + AMQPUtils.PropertyNames propertyNames = AMQPUtils.PropertyNames.fromValue(amqpPropName); + + if (propertyNames != null) { + try { + switch (propertyNames){ + case CONTENT_TYPE: + builder.contentType(amqpPropValue); + break; + case CONTENT_ENCODING: + builder.contentEncoding(amqpPropValue); + break; + case HEADERS: + String[] s = amqpPropValue.split("\\" + AMQPUtils.AMQP_PROP_DELIMITER); + Map<String, Object> headers = new HashMap<>(); + + for (int i = 0; i < s.length ; i += 2){ + if (i + 2 <= s.length){ + headers.put(s[i], s[i + 1]); + } + } + + builder.headers(headers); + break; --- End diff -- Another option is to consider all attributes with the 'amqp$' prefix which aren't valid AMQP properties as candidates for property headers. E.g. a FlowFile attribute 'amqp$attr=value' would be added to the headers map as 'attr=value'. > NiFi is unable to populate over 1/4 of AMQP properties from flow properties > --------------------------------------------------------------------------- > > Key: NIFI-1686 > URL: https://issues.apache.org/jira/browse/NIFI-1686 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework > Affects Versions: 0.5.1 > Reporter: Stephen Harper > > When creating a flow (we used ListenHTTP, but this bug will affect all) that > forwards on to a rabbit queue, org.apache.nifi.amqp.processors.PublishAMQP > uses the method extractAmqpPropertiesFromFlowFile to populate the AMQP > BasicProperties if the flow attributes match a certain format (i.e > amqp$contentType=text/xml). > The method in question uses reflection to find a matching method name from > the AMQP.BasicProperties class, and tries to populate accordingly. > This works fine for all properties that take a String argument - however > there are some that don't (specifically, headers takes a Map<String, Object>, > deliveryMode and priority take Integer, and timestamp takes a Date), and it > is impossible to populate these values because the invocation assumes a > String is required, and fails on line 210. > Whatsmore, the comment underneath (line 215) states that "this should really > never happen since it should be caught by the above IF" - however the author > of the code mustn't have tested all cases because this error is consistently > present when trying to forward flow attributes in over a quarter of the > available amqp properties. -- This message was sent by Atlassian JIRA (v6.3.4#6332)