[ 
https://issues.apache.org/jira/browse/NIFI-1686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15213514#comment-15213514
 ] 

ASF GitHub Bot commented on NIFI-1686:
--------------------------------------

Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/305#discussion_r57530817
  
    --- Diff: 
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
 ---
    @@ -195,29 +193,85 @@ public void process(final InputStream in) throws 
IOException {
          * Extracts AMQP properties from the {@link FlowFile} attributes. 
Attributes
          * extracted from {@link FlowFile} are considered candidates for AMQP
          * properties if their names are prefixed with
    -     * {@link AMQPUtils#AMQP_PROP_PREFIX} (e.g., amqp$contentType=text/xml)
    +     * {@link AMQPUtils#AMQP_PROP_PREFIX} (e.g., 
amqp$contentType=text/xml).
    +     * The header property is an exception and requires a {@link Map}, so 
should
    +     * be passed in the following format: amqp$headers=key$value$key$value 
etc.
          */
         private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile 
flowFile) {
             Map<String, String> attributes = flowFile.getAttributes();
             AMQP.BasicProperties.Builder builder = new 
AMQP.BasicProperties.Builder();
             for (Entry<String, String> attributeEntry : attributes.entrySet()) 
{
                 if 
(attributeEntry.getKey().startsWith(AMQPUtils.AMQP_PROP_PREFIX)) {
    -                String amqpPropName = attributeEntry.getKey().split("\\" + 
AMQPUtils.AMQP_PROP_DELIMITER)[1];
    +                String amqpPropName = attributeEntry.getKey();
                     String amqpPropValue = attributeEntry.getValue();
    -                try {
    -                    if 
(amqpPropertyNames.contains(AMQPUtils.AMQP_PROP_PREFIX + amqpPropName)) {
    -                        Method m = 
builder.getClass().getDeclaredMethod(amqpPropName, String.class);
    -                        m.invoke(builder, amqpPropValue);
    -                    } else {
    -                        getLogger().warn("Unrecogninsed AMQP property '" + 
amqpPropName + "', will ignore.");
    +
    +                AMQPUtils.PropertyNames propertyNames = 
AMQPUtils.PropertyNames.fromValue(amqpPropName);
    +
    +                if (propertyNames != null) {
    +                    try {
    +                        switch (propertyNames){
    +                            case CONTENT_TYPE:
    +                                builder.contentType(amqpPropValue);
    +                                break;
    +                            case CONTENT_ENCODING:
    +                                builder.contentEncoding(amqpPropValue);
    +                                break;
    +                            case HEADERS:
    +                                String[] s = amqpPropValue.split("\\" + 
AMQPUtils.AMQP_PROP_DELIMITER);
    +                                Map<String, Object> headers = new 
HashMap<>();
    +
    +                                for (int i = 0; i < s.length ; i += 2){
    +                                    if (i + 2 <= s.length){
    +                                        headers.put(s[i], s[i + 1]);
    +                                    }
    +                                }
    +
    +                                builder.headers(headers);
    +                                break;
    +                            case DELIVERY_MODE:
    +                                
builder.deliveryMode(Integer.parseInt(amqpPropValue));
    +                                break;
    +                            case PRIORITY:
    +                                
builder.priority(Integer.parseInt(amqpPropValue));
    +                                break;
    +                            case CORRELATION_ID:
    +                                builder.correlationId(amqpPropValue);
    +                                break;
    +                            case REPLY_TO:
    +                                builder.replyTo(amqpPropValue);
    +                                break;
    +                            case EXPIRATION:
    +                                builder.expiration(amqpPropValue);
    +                                break;
    +                            case MESSAGE_ID:
    +                                builder.messageId(amqpPropValue);
    +                                break;
    +                            case TIMESTAMP:
    +                                builder.timestamp(new 
SimpleDateFormat("yyyy/MM/dd HH:mm:ss").parse(amqpPropValue));
    --- End diff --
    
    I see that the assumption here is that String representation of Date will 
always come in "yyyy/MM/dd HH:mm:ss". There are several issues here:
    1. Similar to DELIVERY_MODE and PRIORITY, the value could be null resulting 
in NPE
    2. Value could be date-un-parseable String resulting in ParseException
    3. Value could be parseable but in the wrong format. This is the worst case 
scenario since unlike the two before the parsing will be successful producing 
the wrong Date. (try Month/Day/Year format primarily used in the US).
    
    Hopefully you understand now why we opted out of supporting certain 
properties in the initial implementation. 
    
    So, we definitely need to give it more thought and I am curious to see if 
you have any. One thing to consider is  NiFi-wide StringDate representation 
which would have validation, documentation etc., 
    to avoid issues described above.  



> NiFi is unable to populate over 1/4 of AMQP properties from flow properties
> ---------------------------------------------------------------------------
>
>                 Key: NIFI-1686
>                 URL: https://issues.apache.org/jira/browse/NIFI-1686
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Core Framework
>    Affects Versions: 0.5.1
>            Reporter: Stephen Harper
>
> When creating a flow (we used ListenHTTP, but this bug will affect all) that 
> forwards on to a rabbit queue, org.apache.nifi.amqp.processors.PublishAMQP 
> uses the method extractAmqpPropertiesFromFlowFile to populate the AMQP 
> BasicProperties if the flow attributes match a certain format (i.e 
> amqp$contentType=text/xml). 
> The method in question uses reflection to find a matching method name from 
> the AMQP.BasicProperties class, and tries to populate accordingly.
> This works fine for all properties that take a String argument - however 
> there are some that don't (specifically, headers takes a Map<String, Object>, 
> deliveryMode and priority take Integer, and timestamp takes a Date), and it 
> is impossible to populate these values because the invocation assumes a 
> String is required, and fails on line 210.
> Whatsmore, the comment underneath (line 215) states that "this should really 
> never happen since it should be caught by the above IF" - however the author 
> of the code mustn't have tested all cases because this error is consistently 
> present when trying to forward flow attributes in over a quarter of the 
> available amqp properties.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to