This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 3678a4bca3 NIFI-6730 AMQP QoS support
3678a4bca3 is described below

commit 3678a4bca3447bcf8f70efc19bbec33894a44f6f
Author: Mikhail Sapozhnikov <masapozhni...@yandex.ru>
AuthorDate: Fri Dec 8 14:38:14 2023 +0300

    NIFI-6730 AMQP QoS support
    
    This closes #8146.
    
    Signed-off-by: Peter Turcsanyi <turcsa...@apache.org>
    (cherry picked from commit 76613a0ed4a90c5e264e0537990278ec9e422536)
---
 .../apache/nifi/amqp/processors/AMQPConsumer.java  |  4 +++-
 .../apache/nifi/amqp/processors/ConsumeAMQP.java   | 15 +++++++++++-
 .../nifi/amqp/processors/AMQPConsumerTest.java     | 28 +++++++++++++++-------
 .../nifi/amqp/processors/ConsumeAMQPTest.java      |  4 +++-
 .../apache/nifi/amqp/processors/TestChannel.java   |  6 ++++-
 5 files changed, 45 insertions(+), 12 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java
index e11044845c..04f951836a 100644
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java
@@ -43,7 +43,8 @@ final class AMQPConsumer extends AMQPWorker {
     private final boolean autoAcknowledge;
     private final Consumer consumer;
 
-    AMQPConsumer(final Connection connection, final String queueName, final 
boolean autoAcknowledge, ComponentLog processorLog) throws IOException {
+    AMQPConsumer(final Connection connection, final String queueName, final 
boolean autoAcknowledge, final int prefetchCount,
+            ComponentLog processorLog) throws IOException {
         super(connection, processorLog);
         this.validateStringProperty("queueName", queueName);
         this.queueName = queueName;
@@ -80,6 +81,7 @@ final class AMQPConsumer extends AMQPWorker {
             }
         };
 
+        channel.basicQos(prefetchCount);
         channel.basicConsume(queueName, autoAcknowledge, consumer);
     }
 
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
index 2a7f68d48a..771db0e32a 100644
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
@@ -112,6 +112,17 @@ public class ConsumeAMQP extends 
AbstractAMQPProcessor<AMQPConsumer> {
         .defaultValue("10")
         .required(true)
         .build();
+    static final PropertyDescriptor PREFETCH_COUNT = new 
PropertyDescriptor.Builder()
+        .name("prefetch.count")
+        .displayName("Prefetch Count")
+        .description("The maximum number of unacknowledged messages for the 
consumer. If consumer has this number of unacknowledged messages, AMQP broker 
will "
+               + "no longer send new messages until consumer acknowledges some 
of the messages already delivered to it."
+               + "Allowed values: from 0 to 65535. 0 means no limit")
+        .addValidator(StandardValidators.createLongValidator(0, 65535, true))
+        .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+        .defaultValue("0")
+        .required(true)
+        .build();
 
     public static final PropertyDescriptor HEADER_FORMAT = new 
PropertyDescriptor.Builder()
         .name("header.format")
@@ -167,6 +178,7 @@ public class ConsumeAMQP extends 
AbstractAMQPProcessor<AMQPConsumer> {
         properties.add(QUEUE);
         properties.add(AUTO_ACKNOWLEDGE);
         properties.add(BATCH_SIZE);
+        properties.add(PREFETCH_COUNT);
         properties.add(HEADER_FORMAT);
         properties.add(HEADER_KEY_PREFIX);
         properties.add(HEADER_SEPARATOR);
@@ -301,7 +313,8 @@ public class ConsumeAMQP extends 
AbstractAMQPProcessor<AMQPConsumer> {
         try {
             final String queueName = context.getProperty(QUEUE).getValue();
             final boolean autoAcknowledge = 
context.getProperty(AUTO_ACKNOWLEDGE).asBoolean();
-            final AMQPConsumer amqpConsumer = new AMQPConsumer(connection, 
queueName, autoAcknowledge, getLogger());
+            final int prefetchCount =  
context.getProperty(PREFETCH_COUNT).asInteger();
+            final AMQPConsumer amqpConsumer = new AMQPConsumer(connection, 
queueName, autoAcknowledge, prefetchCount, getLogger());
 
             return amqpConsumer;
         } catch (final IOException ioe) {
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java
index e4d7b1d162..195bd7adc4 100644
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java
@@ -42,6 +42,7 @@ import org.junit.jupiter.api.Test;
 
 public class AMQPConsumerTest {
 
+    private static final int DEFAULT_PREFETCH_COUNT = 0;
     private ComponentLog processorLog;
 
     @BeforeEach
@@ -55,7 +56,7 @@ public class AMQPConsumerTest {
         final Map<String, String> exchangeToRoutingKeymap = 
Collections.singletonMap("myExchange", "key1");
 
         final TestConnection connection = new 
TestConnection(exchangeToRoutingKeymap, routingMap);
-        final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", 
true, processorLog);
+        final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", 
true, DEFAULT_PREFETCH_COUNT, processorLog);
         consumer.getChannel().basicPublish("myExchange", "key1", new 
BasicProperties(), new byte[0]);
 
         consumer.close();
@@ -69,7 +70,7 @@ public class AMQPConsumerTest {
         final Map<String, String> exchangeToRoutingKeymap = 
Collections.singletonMap("myExchange", "key1");
 
         final TestConnection connection = new 
TestConnection(exchangeToRoutingKeymap, routingMap);
-        final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", 
true, processorLog);
+        final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", 
true, DEFAULT_PREFETCH_COUNT, processorLog);
 
         assertFalse(consumer.closed);
 
@@ -80,14 +81,14 @@ public class AMQPConsumerTest {
 
     @Test
     public void failOnNullConnection() {
-        assertThrows(IllegalArgumentException.class, () -> new 
AMQPConsumer(null, null, true, processorLog));
+        assertThrows(IllegalArgumentException.class, () -> new 
AMQPConsumer(null, null, true, DEFAULT_PREFETCH_COUNT, processorLog));
     }
 
     @Test
     public void failOnNullQueueName() {
         assertThrows(IllegalArgumentException.class, () -> {
             Connection conn = new TestConnection(null, null);
-            new AMQPConsumer(conn, null, true, processorLog);
+            new AMQPConsumer(conn, null, true, DEFAULT_PREFETCH_COUNT, 
processorLog);
         });
     }
 
@@ -95,7 +96,7 @@ public class AMQPConsumerTest {
     public void failOnEmptyQueueName() {
         assertThrows(IllegalArgumentException.class, () -> {
             Connection conn = new TestConnection(null, null);
-            new AMQPConsumer(conn, " ", true, processorLog);
+            new AMQPConsumer(conn, " ", true, DEFAULT_PREFETCH_COUNT, 
processorLog);
         });
     }
 
@@ -103,7 +104,7 @@ public class AMQPConsumerTest {
     public void failOnNonExistingQueue() {
         assertThrows(IOException.class, () -> {
             Connection conn = new TestConnection(null, null);
-            try (AMQPConsumer consumer = new AMQPConsumer(conn, "hello", true, 
processorLog)) {
+            try (AMQPConsumer consumer = new AMQPConsumer(conn, "hello", true, 
DEFAULT_PREFETCH_COUNT, processorLog)) {
                 consumer.consume();
             }
         });
@@ -117,7 +118,7 @@ public class AMQPConsumerTest {
         exchangeToRoutingKeymap.put("", "queue1");
 
         Connection conn = new TestConnection(exchangeToRoutingKeymap, 
routingMap);
-        try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true, 
processorLog)) {
+        try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true, 
DEFAULT_PREFETCH_COUNT, processorLog)) {
             GetResponse response = consumer.consume();
             assertNull(response);
         }
@@ -132,9 +133,20 @@ public class AMQPConsumerTest {
 
         Connection conn = new TestConnection(exchangeToRoutingKeymap, 
routingMap);
         conn.createChannel().basicPublish("myExchange", "key1", null, "hello 
Joe".getBytes());
-        try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true, 
processorLog)) {
+        try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true, 
DEFAULT_PREFETCH_COUNT, processorLog)) {
             GetResponse response = consumer.consume();
             assertNotNull(response);
         }
     }
+
+    @Test
+    public void validatePrefetchSet() throws Exception {
+        final Map<String, List<String>> routingMap = 
Collections.singletonMap("key1", Arrays.asList("queue1"));
+        final Map<String, String> exchangeToRoutingKeymap = 
Collections.singletonMap("myExchange", "key1");
+        Connection conn = new TestConnection(exchangeToRoutingKeymap, 
routingMap);
+        try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true, 
100, processorLog)) {
+            TestChannel channel = (TestChannel)consumer.getChannel();
+            assertEquals(100, channel.getPrefetchCount());
+        }
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
index 0655caaa21..fce2c260a1 100644
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
@@ -393,7 +393,9 @@ public class ConsumeAMQPTest {
                     throw new IllegalStateException("Consumer already 
created");
                 }
 
-                consumer = new AMQPConsumer(connection, 
context.getProperty(ConsumeAMQP.QUEUE).getValue(), 
context.getProperty(ConsumeAMQP.AUTO_ACKNOWLEDGE).asBoolean(), getLogger());
+                consumer = new AMQPConsumer(connection, 
context.getProperty(ConsumeAMQP.QUEUE).getValue(),
+                        
context.getProperty(ConsumeAMQP.AUTO_ACKNOWLEDGE).asBoolean(), 
context.getProperty(ConsumeAMQP.PREFETCH_COUNT).asInteger(),
+                        getLogger());
                 return consumer;
             } catch (IOException e) {
                 throw new ProcessException(e);
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java
index c68ffeefd1..17b267eee1 100644
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java
@@ -77,6 +77,7 @@ class TestChannel implements Channel {
     private long deliveryTag = 0L;
     private final BitSet acknowledgments = new BitSet();
     private final BitSet nacks = new BitSet();
+    private int prefetchCount = 0;
 
     public TestChannel(Map<String, String> exchangeToRoutingKeyMappings,
             Map<String, List<String>> routingKeyToQueueMappings) {
@@ -222,8 +223,11 @@ class TestChannel implements Channel {
 
     @Override
     public void basicQos(int prefetchCount) throws IOException {
-        throw new UnsupportedOperationException("This method is not currently 
supported as it is not used by current API in testing");
+        this.prefetchCount = prefetchCount;
+    }
 
+    public int getPrefetchCount() {
+        return this.prefetchCount;
     }
 
     @Override

Reply via email to