This is an automated email from the ASF dual-hosted git repository. turcsanyi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 76613a0ed4 NIFI-6730 AMQP QoS support 76613a0ed4 is described below commit 76613a0ed4a90c5e264e0537990278ec9e422536 Author: Mikhail Sapozhnikov <masapozhni...@yandex.ru> AuthorDate: Fri Dec 8 14:38:14 2023 +0300 NIFI-6730 AMQP QoS support This closes #8146. Signed-off-by: Peter Turcsanyi <turcsa...@apache.org> --- .../apache/nifi/amqp/processors/AMQPConsumer.java | 4 +++- .../apache/nifi/amqp/processors/ConsumeAMQP.java | 15 +++++++++++- .../nifi/amqp/processors/AMQPConsumerTest.java | 28 +++++++++++++++------- .../nifi/amqp/processors/ConsumeAMQPTest.java | 4 +++- .../apache/nifi/amqp/processors/TestChannel.java | 6 ++++- 5 files changed, 45 insertions(+), 12 deletions(-) diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java index e11044845c..04f951836a 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java @@ -43,7 +43,8 @@ final class AMQPConsumer extends AMQPWorker { private final boolean autoAcknowledge; private final Consumer consumer; - AMQPConsumer(final Connection connection, final String queueName, final boolean autoAcknowledge, ComponentLog processorLog) throws IOException { + AMQPConsumer(final Connection connection, final String queueName, final boolean autoAcknowledge, final int prefetchCount, + ComponentLog processorLog) throws IOException { super(connection, processorLog); this.validateStringProperty("queueName", queueName); this.queueName = queueName; @@ -80,6 +81,7 @@ final class AMQPConsumer extends AMQPWorker { } }; + channel.basicQos(prefetchCount); channel.basicConsume(queueName, autoAcknowledge, consumer); } diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java index 23552d6430..6d4e5d01a6 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java @@ -112,6 +112,17 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> { .defaultValue("10") .required(true) .build(); + static final PropertyDescriptor PREFETCH_COUNT = new PropertyDescriptor.Builder() + .name("prefetch.count") + .displayName("Prefetch Count") + .description("The maximum number of unacknowledged messages for the consumer. If consumer has this number of unacknowledged messages, AMQP broker will " + + "no longer send new messages until consumer acknowledges some of the messages already delivered to it." + + "Allowed values: from 0 to 65535. 0 means no limit") + .addValidator(StandardValidators.createLongValidator(0, 65535, true)) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .defaultValue("0") + .required(true) + .build(); public static final PropertyDescriptor HEADER_FORMAT = new PropertyDescriptor.Builder() .name("header.format") @@ -167,6 +178,7 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> { properties.add(QUEUE); properties.add(AUTO_ACKNOWLEDGE); properties.add(BATCH_SIZE); + properties.add(PREFETCH_COUNT); properties.add(HEADER_FORMAT); properties.add(HEADER_KEY_PREFIX); properties.add(HEADER_SEPARATOR); @@ -301,7 +313,8 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> { try { final String queueName = context.getProperty(QUEUE).getValue(); final boolean autoAcknowledge = context.getProperty(AUTO_ACKNOWLEDGE).asBoolean(); - final AMQPConsumer amqpConsumer = new AMQPConsumer(connection, queueName, autoAcknowledge, getLogger()); + final int prefetchCount = context.getProperty(PREFETCH_COUNT).asInteger(); + final AMQPConsumer amqpConsumer = new AMQPConsumer(connection, queueName, autoAcknowledge, prefetchCount, getLogger()); return amqpConsumer; } catch (final IOException ioe) { diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java index e4d7b1d162..195bd7adc4 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java @@ -42,6 +42,7 @@ import org.junit.jupiter.api.Test; public class AMQPConsumerTest { + private static final int DEFAULT_PREFETCH_COUNT = 0; private ComponentLog processorLog; @BeforeEach @@ -55,7 +56,7 @@ public class AMQPConsumerTest { final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1"); final TestConnection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); - final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", true, processorLog); + final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", true, DEFAULT_PREFETCH_COUNT, processorLog); consumer.getChannel().basicPublish("myExchange", "key1", new BasicProperties(), new byte[0]); consumer.close(); @@ -69,7 +70,7 @@ public class AMQPConsumerTest { final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1"); final TestConnection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); - final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", true, processorLog); + final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", true, DEFAULT_PREFETCH_COUNT, processorLog); assertFalse(consumer.closed); @@ -80,14 +81,14 @@ public class AMQPConsumerTest { @Test public void failOnNullConnection() { - assertThrows(IllegalArgumentException.class, () -> new AMQPConsumer(null, null, true, processorLog)); + assertThrows(IllegalArgumentException.class, () -> new AMQPConsumer(null, null, true, DEFAULT_PREFETCH_COUNT, processorLog)); } @Test public void failOnNullQueueName() { assertThrows(IllegalArgumentException.class, () -> { Connection conn = new TestConnection(null, null); - new AMQPConsumer(conn, null, true, processorLog); + new AMQPConsumer(conn, null, true, DEFAULT_PREFETCH_COUNT, processorLog); }); } @@ -95,7 +96,7 @@ public class AMQPConsumerTest { public void failOnEmptyQueueName() { assertThrows(IllegalArgumentException.class, () -> { Connection conn = new TestConnection(null, null); - new AMQPConsumer(conn, " ", true, processorLog); + new AMQPConsumer(conn, " ", true, DEFAULT_PREFETCH_COUNT, processorLog); }); } @@ -103,7 +104,7 @@ public class AMQPConsumerTest { public void failOnNonExistingQueue() { assertThrows(IOException.class, () -> { Connection conn = new TestConnection(null, null); - try (AMQPConsumer consumer = new AMQPConsumer(conn, "hello", true, processorLog)) { + try (AMQPConsumer consumer = new AMQPConsumer(conn, "hello", true, DEFAULT_PREFETCH_COUNT, processorLog)) { consumer.consume(); } }); @@ -117,7 +118,7 @@ public class AMQPConsumerTest { exchangeToRoutingKeymap.put("", "queue1"); Connection conn = new TestConnection(exchangeToRoutingKeymap, routingMap); - try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true, processorLog)) { + try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true, DEFAULT_PREFETCH_COUNT, processorLog)) { GetResponse response = consumer.consume(); assertNull(response); } @@ -132,9 +133,20 @@ public class AMQPConsumerTest { Connection conn = new TestConnection(exchangeToRoutingKeymap, routingMap); conn.createChannel().basicPublish("myExchange", "key1", null, "hello Joe".getBytes()); - try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true, processorLog)) { + try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true, DEFAULT_PREFETCH_COUNT, processorLog)) { GetResponse response = consumer.consume(); assertNotNull(response); } } + + @Test + public void validatePrefetchSet() throws Exception { + final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1")); + final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1"); + Connection conn = new TestConnection(exchangeToRoutingKeymap, routingMap); + try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true, 100, processorLog)) { + TestChannel channel = (TestChannel)consumer.getChannel(); + assertEquals(100, channel.getPrefetchCount()); + } + } } diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java index 0655caaa21..fce2c260a1 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java @@ -393,7 +393,9 @@ public class ConsumeAMQPTest { throw new IllegalStateException("Consumer already created"); } - consumer = new AMQPConsumer(connection, context.getProperty(ConsumeAMQP.QUEUE).getValue(), context.getProperty(ConsumeAMQP.AUTO_ACKNOWLEDGE).asBoolean(), getLogger()); + consumer = new AMQPConsumer(connection, context.getProperty(ConsumeAMQP.QUEUE).getValue(), + context.getProperty(ConsumeAMQP.AUTO_ACKNOWLEDGE).asBoolean(), context.getProperty(ConsumeAMQP.PREFETCH_COUNT).asInteger(), + getLogger()); return consumer; } catch (IOException e) { throw new ProcessException(e); diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java index c68ffeefd1..17b267eee1 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java @@ -77,6 +77,7 @@ class TestChannel implements Channel { private long deliveryTag = 0L; private final BitSet acknowledgments = new BitSet(); private final BitSet nacks = new BitSet(); + private int prefetchCount = 0; public TestChannel(Map<String, String> exchangeToRoutingKeyMappings, Map<String, List<String>> routingKeyToQueueMappings) { @@ -222,8 +223,11 @@ class TestChannel implements Channel { @Override public void basicQos(int prefetchCount) throws IOException { - throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); + this.prefetchCount = prefetchCount; + } + public int getPrefetchCount() { + return this.prefetchCount; } @Override