This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 9b9dd4bae3be28057ada9fcfeca9f8e3efb79abd
Author: annanys23 <anna...@gmail.com>
AuthorDate: Fri Sep 8 16:25:55 2023 +0000

    NIFI-6721: This closes #7789. jms_expiration attribute problem fix
    
    Originally authored in part by sjyang18 <ilson...@hotmail.com>
    
    Signed-off-by: Joseph Witt <joew...@apache.org>
---
 .../apache/nifi/jms/processors/JMSPublisher.java   | 23 ++++++++++++--
 .../org/apache/nifi/jms/processors/PublishJMS.java |  2 +-
 .../jms/processors/JMSPublisherConsumerIT.java     | 36 ++++++++++++++++++++++
 3 files changed, 58 insertions(+), 3 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
index 67a49a9dd6..d5892f013b 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.jms.processors;
 
+import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.nifi.logging.ComponentLog;
 import org.springframework.jms.connection.CachingConnectionFactory;
 import org.springframework.jms.core.JmsTemplate;
@@ -28,6 +29,7 @@ import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import java.time.Instant;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
@@ -79,8 +81,25 @@ class JMSPublisher extends JMSWorker {
                         
this.jmsTemplate.setDeliveryMode(Integer.parseInt(entry.getValue()));
                         this.jmsTemplate.setExplicitQosEnabled(true);
                     } else if (entry.getKey().equals(JmsHeaders.EXPIRATION)) {
-                        
this.jmsTemplate.setTimeToLive(Integer.parseInt(entry.getValue()));
-                        this.jmsTemplate.setExplicitQosEnabled(true);
+                        if(NumberUtils.isCreatable(entry.getValue())) { 
//ignore any non-numeric values
+                            long expiration = Long.parseLong(entry.getValue());
+                            long ttl = 0L;
+
+                            // if expiration was set to a positive non-zero 
value, then calculate the ttl
+                            // jmsTemplate does not have an expiration field, 
and can only accept a ttl value
+                            // which is then used to set jms_expiration header
+                            // ttl is in epoch millis
+                            if(expiration > 0) {
+                                ttl = expiration - 
Instant.now().toEpochMilli();
+                                if(ttl > 0) {
+                                    this.jmsTemplate.setTimeToLive(ttl);
+                                    
this.jmsTemplate.setExplicitQosEnabled(true);
+                                } // else, use default ttl
+                            } else if (expiration == 0) { // expiration == 0 
means no expiration in jms
+                                this.jmsTemplate.setTimeToLive(0); //ttl must 
be explicitly set to 0 to indicate no expiration
+                                this.jmsTemplate.setExplicitQosEnabled(true);
+                            } // else, use default ttl
+                        }
                     } else if (entry.getKey().equals(JmsHeaders.PRIORITY)) {
                         
this.jmsTemplate.setPriority(Integer.parseInt(entry.getValue()));
                         this.jmsTemplate.setExplicitQosEnabled(true);
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
index 8fea7dd663..1e37f6aea1 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
@@ -85,7 +85,7 @@ import static 
org.apache.nifi.jms.processors.ioconcept.reader.record.ProvenanceE
         + "FlowFile attributes will be added as JMS headers and/or properties 
to the outgoing JMS message.")
 @ReadsAttributes({
         @ReadsAttribute(attribute = JmsHeaders.DELIVERY_MODE, description = 
"This attribute becomes the JMSDeliveryMode message header. Must be an 
integer."),
-        @ReadsAttribute(attribute = JmsHeaders.EXPIRATION, description = "This 
attribute becomes the JMSExpiration message header. Must be an integer."),
+        @ReadsAttribute(attribute = JmsHeaders.EXPIRATION, description = "This 
attribute becomes the JMSExpiration message header. Must be a long."),
         @ReadsAttribute(attribute = JmsHeaders.PRIORITY, description = "This 
attribute becomes the JMSPriority message header. Must be an integer."),
         @ReadsAttribute(attribute = JmsHeaders.REDELIVERED, description = 
"This attribute becomes the JMSRedelivered message header."),
         @ReadsAttribute(attribute = JmsHeaders.TIMESTAMP, description = "This 
attribute becomes the JMSTimestamp message header. Must be a long."),
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
index e78734ac15..36dd3828e7 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
@@ -41,6 +41,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.time.Instant;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
@@ -261,6 +262,41 @@ public class JMSPublisherConsumerIT {
         }
     }
 
+    @Test
+    public void validateNIFI6721() throws Exception {
+
+        final String destinationName = "validateNIFI6721";
+        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
+
+        try {
+            ComponentLog mockLog = mock(ComponentLog.class);
+            JMSPublisher publisher = new 
JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), 
jmsTemplate, mockLog);
+            Map<String, String> flowFileAttributes = new HashMap<>();
+            flowFileAttributes.put(JmsHeaders.EXPIRATION, "never"); // value 
expected to be long, make sure non-long doesn't cause problems
+            publisher.publish(destinationName, "hellomq-0".getBytes(), 
flowFileAttributes);
+            Message receivedMessage = jmsTemplate.receive(destinationName);
+            assertEquals(0, receivedMessage.getJMSExpiration());
+
+            long expiration = Instant.now().toEpochMilli() + 1000 * 120;
+            flowFileAttributes.put(JmsHeaders.EXPIRATION, 
Long.toString(expiration));
+            publisher.publish(destinationName, "hellomq-1".getBytes(), 
flowFileAttributes);
+            receivedMessage = jmsTemplate.receive(destinationName);
+            assertEquals(expiration, receivedMessage.getJMSExpiration());
+
+            flowFileAttributes.put(JmsHeaders.EXPIRATION, "-1");
+            publisher.publish(destinationName, "hellomq-3".getBytes(), 
flowFileAttributes);
+            receivedMessage = jmsTemplate.receive(destinationName);
+            assertTrue(receivedMessage.getJMSExpiration() > 0);
+
+            flowFileAttributes.put(JmsHeaders.EXPIRATION, "0");
+            publisher.publish(destinationName, "hellomq-2".getBytes(), 
flowFileAttributes);
+            //assertEquals(mockLog.getWarnMessages().size(), 0);
+            receivedMessage = jmsTemplate.receive(destinationName);
+            assertEquals(0, receivedMessage.getJMSExpiration());
+        } finally {
+            ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
+        }
+    }
     /**
      * At the moment the only two supported message types are TextMessage and
      * BytesMessage which is sufficient for the type if JMS use cases NiFi is

Reply via email to