Re: [PR] NIFI-6730 AMQP QoS support [nifi]
asfgit closed pull request #8146: NIFI-6730 AMQP QoS support URL: https://github.com/apache/nifi/pull/8146 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-6730 AMQP QoS support [nifi]
MooseTheBrown commented on code in PR #8146: URL: https://github.com/apache/nifi/pull/8146#discussion_r1426806679 ## nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java: ## @@ -112,6 +115,17 @@ public class ConsumeAMQP extends AbstractAMQPProcessor { .defaultValue("10") .required(true) .build(); +static final PropertyDescriptor PREFETCH_COUNT = new PropertyDescriptor.Builder() +.name("prefetch.count") +.displayName("Prefetch Count") +.description("The maximum number of unacknowledged messages for the consumer. If consumer has this number of unacknowledged messages, AMQP broker will " + + "no longer send new messages until consumer acknowledges some of the messages already delivered to it." + + "Allowed values: from 0 to 65535. 0 means no limit") +.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) Review Comment: Yes, thanks for pointing this out. Removed unused imports. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-6730 AMQP QoS support [nifi]
turcsanyip commented on code in PR #8146: URL: https://github.com/apache/nifi/pull/8146#discussion_r1426750707 ## nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java: ## @@ -112,6 +115,17 @@ public class ConsumeAMQP extends AbstractAMQPProcessor { .defaultValue("10") .required(true) .build(); +static final PropertyDescriptor PREFETCH_COUNT = new PropertyDescriptor.Builder() +.name("prefetch.count") +.displayName("Prefetch Count") +.description("The maximum number of unacknowledged messages for the consumer. If consumer has this number of unacknowledged messages, AMQP broker will " + + "no longer send new messages until consumer acknowledges some of the messages already delivered to it." + + "Allowed values: from 0 to 65535. 0 means no limit") +.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) Review Comment: Thanks @MooseTheBrown! Could you please also remove the now unused imports? The build fails with Checkstyle violations. You can check it with running `mvn clean verify -P contrib-check` in `nifi-amqp-bundle` directory. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-6730 AMQP QoS support [nifi]
MooseTheBrown commented on code in PR #8146: URL: https://github.com/apache/nifi/pull/8146#discussion_r1426414623 ## nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java: ## @@ -112,6 +115,17 @@ public class ConsumeAMQP extends AbstractAMQPProcessor { .defaultValue("10") .required(true) .build(); +static final PropertyDescriptor PREFETCH_COUNT = new PropertyDescriptor.Builder() +.name("prefetch.count") +.displayName("Prefetch Count") +.description("The maximum number of unacknowledged messages for the consumer. If consumer has this number of unacknowledged messages, AMQP broker will " + + "no longer send new messages until consumer acknowledges some of the messages already delivered to it." + + "Allowed values: from 0 to 65535. 0 means no limit") +.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) Review Comment: Corrected. Used standard long validator instead of customValidate in ConsumeAMQP -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-6730 AMQP QoS support [nifi]
turcsanyip commented on code in PR #8146: URL: https://github.com/apache/nifi/pull/8146#discussion_r1425813999 ## nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java: ## @@ -112,6 +115,17 @@ public class ConsumeAMQP extends AbstractAMQPProcessor { .defaultValue("10") .required(true) .build(); +static final PropertyDescriptor PREFETCH_COUNT = new PropertyDescriptor.Builder() +.name("prefetch.count") +.displayName("Prefetch Count") +.description("The maximum number of unacknowledged messages for the consumer. If consumer has this number of unacknowledged messages, AMQP broker will " + + "no longer send new messages until consumer acknowledges some of the messages already delivered to it." + + "Allowed values: from 0 to 65535. 0 means no limit") +.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) Review Comment: @MooseTheBrown Thanks for adding `Prefetch Count` property! Please note, there is a standard validator for checking if a number is within a range: https://github.com/apache/nifi/blob/5b664147eeccd9d2a8ded9040a3d64c96e70d639/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java#L758 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-6730 AMQP QoS support [nifi]
dan-s1 commented on code in PR #8146: URL: https://github.com/apache/nifi/pull/8146#discussion_r1425453995 ## nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java: ## @@ -112,6 +112,16 @@ public class ConsumeAMQP extends AbstractAMQPProcessor { .defaultValue("10") .required(true) .build(); +static final PropertyDescriptor PREFETCH_COUNT = new PropertyDescriptor.Builder() +.name("prefetch.count") +.displayName("Prefetch Count") +.description("The maximum number of unacknowledged messages for the consumer. If consumer has this number of unacknowledged messages, AMQP broker will " + + "no longer send new messages until consumer acknowledges some of the messages already delivered to it. 0 means no limit") +.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) +.expressionLanguageSupported(ExpressionLanguageScope.NONE) +.defaultValue("0") +.required(true) +.build(); Review Comment: You are correct. I am sorry about that, I am not sure what i was thinking when I entered `AbstractAMQPProcessor` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-6730 AMQP QoS support [nifi]
dan-s1 commented on code in PR #8146: URL: https://github.com/apache/nifi/pull/8146#discussion_r1425443615 ## nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java: ## @@ -43,6 +43,7 @@ public class AMQPConsumerTest { private ComponentLog processorLog; +private static final int DEFAULT_PREFETCH_COUNT = 0; Review Comment: ```suggestion private static final int DEFAULT_PREFETCH_COUNT = 0; private ComponentLog processorLog; ``` ## nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java: ## @@ -301,7 +316,8 @@ protected synchronized AMQPConsumer createAMQPWorker(final ProcessContext contex try { final String queueName = context.getProperty(QUEUE).getValue(); final boolean autoAcknowledge = context.getProperty(AUTO_ACKNOWLEDGE).asBoolean(); -final AMQPConsumer amqpConsumer = new AMQPConsumer(connection, queueName, autoAcknowledge, getLogger()); +final AMQPConsumer amqpConsumer = new AMQPConsumer(connection, queueName, autoAcknowledge, +context.getProperty(PREFETCH_COUNT).asInteger(), getLogger()); Review Comment: ```suggestion final int prefetchCount = context.getProperty(PREFETCH_COUNT).asInteger(); final AMQPConsumer amqpConsumer = new AMQPConsumer(connection, queueName, autoAcknowledge, prefetchCount, getLogger()); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-6730 AMQP QoS support [nifi]
MooseTheBrown commented on code in PR #8146: URL: https://github.com/apache/nifi/pull/8146#discussion_r1425103142 ## nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java: ## @@ -80,30 +80,30 @@ public void testConsumerHandlesCancelling() throws IOException { @Test public void failOnNullConnection() { -assertThrows(IllegalArgumentException.class, () -> new AMQPConsumer(null, null, true, processorLog)); +assertThrows(IllegalArgumentException.class, () -> new AMQPConsumer(null, null, true, 0, processorLog)); } @Test public void failOnNullQueueName() { assertThrows(IllegalArgumentException.class, () -> { Connection conn = new TestConnection(null, null); -new AMQPConsumer(conn, null, true, processorLog); +new AMQPConsumer(conn, null, true, 0, processorLog); }); } @Test public void failOnEmptyQueueName() { assertThrows(IllegalArgumentException.class, () -> { Connection conn = new TestConnection(null, null); -new AMQPConsumer(conn, " ", true, processorLog); +new AMQPConsumer(conn, " ", true, 0, processorLog); }); } @Test public void failOnNonExistingQueue() { assertThrows(IOException.class, () -> { Connection conn = new TestConnection(null, null); -try (AMQPConsumer consumer = new AMQPConsumer(conn, "hello", true, processorLog)) { +try (AMQPConsumer consumer = new AMQPConsumer(conn, "hello", true, 0, processorLog)) { Review Comment: Fixed ## nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java: ## @@ -117,7 +117,7 @@ public void validateSuccessfullConsumeWithEmptyQueueDefaultExchange() throws Exc exchangeToRoutingKeymap.put("", "queue1"); Connection conn = new TestConnection(exchangeToRoutingKeymap, routingMap); -try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true, processorLog)) { +try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true, 0, processorLog)) { Review Comment: Fixed ## nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java: ## @@ -132,9 +132,20 @@ public void validateSuccessfullConsumeWithEmptyQueue() throws Exception { Connection conn = new TestConnection(exchangeToRoutingKeymap, routingMap); conn.createChannel().basicPublish("myExchange", "key1", null, "hello Joe".getBytes()); -try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true, processorLog)) { +try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true, 0, processorLog)) { Review Comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-6730 AMQP QoS support [nifi]
MooseTheBrown commented on code in PR #8146: URL: https://github.com/apache/nifi/pull/8146#discussion_r1425102857 ## nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java: ## @@ -80,30 +80,30 @@ public void testConsumerHandlesCancelling() throws IOException { @Test public void failOnNullConnection() { -assertThrows(IllegalArgumentException.class, () -> new AMQPConsumer(null, null, true, processorLog)); +assertThrows(IllegalArgumentException.class, () -> new AMQPConsumer(null, null, true, 0, processorLog)); Review Comment: Fixed ## nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java: ## @@ -80,30 +80,30 @@ public void testConsumerHandlesCancelling() throws IOException { @Test public void failOnNullConnection() { -assertThrows(IllegalArgumentException.class, () -> new AMQPConsumer(null, null, true, processorLog)); +assertThrows(IllegalArgumentException.class, () -> new AMQPConsumer(null, null, true, 0, processorLog)); } @Test public void failOnNullQueueName() { assertThrows(IllegalArgumentException.class, () -> { Connection conn = new TestConnection(null, null); -new AMQPConsumer(conn, null, true, processorLog); +new AMQPConsumer(conn, null, true, 0, processorLog); Review Comment: Fixed ## nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java: ## @@ -80,30 +80,30 @@ public void testConsumerHandlesCancelling() throws IOException { @Test public void failOnNullConnection() { -assertThrows(IllegalArgumentException.class, () -> new AMQPConsumer(null, null, true, processorLog)); +assertThrows(IllegalArgumentException.class, () -> new AMQPConsumer(null, null, true, 0, processorLog)); } @Test public void failOnNullQueueName() { assertThrows(IllegalArgumentException.class, () -> { Connection conn = new TestConnection(null, null); -new AMQPConsumer(conn, null, true, processorLog); +new AMQPConsumer(conn, null, true, 0, processorLog); }); } @Test public void failOnEmptyQueueName() { assertThrows(IllegalArgumentException.class, () -> { Connection conn = new TestConnection(null, null); -new AMQPConsumer(conn, " ", true, processorLog); +new AMQPConsumer(conn, " ", true, 0, processorLog); Review Comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-6730 AMQP QoS support [nifi]
MooseTheBrown commented on code in PR #8146: URL: https://github.com/apache/nifi/pull/8146#discussion_r1425102583 ## nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java: ## @@ -55,7 +55,7 @@ public void testResponseQueueDrained() throws TimeoutException, IOException { final Map exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1"); final TestConnection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); -final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", true, processorLog); +final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", true, 0, processorLog); Review Comment: Fixed. ## nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java: ## @@ -69,7 +69,7 @@ public void testConsumerHandlesCancelling() throws IOException { final Map exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1"); final TestConnection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); -final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", true, processorLog); +final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", true, 0, processorLog); Review Comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-6730 AMQP QoS support [nifi]
MooseTheBrown commented on code in PR #8146: URL: https://github.com/apache/nifi/pull/8146#discussion_r1425102068 ## nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java: ## @@ -112,6 +112,16 @@ public class ConsumeAMQP extends AbstractAMQPProcessor { .defaultValue("10") .required(true) .build(); +static final PropertyDescriptor PREFETCH_COUNT = new PropertyDescriptor.Builder() +.name("prefetch.count") +.displayName("Prefetch Count") +.description("The maximum number of unacknowledged messages for the consumer. If consumer has this number of unacknowledged messages, AMQP broker will " + + "no longer send new messages until consumer acknowledges some of the messages already delivered to it. 0 means no limit") +.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) +.expressionLanguageSupported(ExpressionLanguageScope.NONE) +.defaultValue("0") +.required(true) +.build(); Review Comment: Added customValidate with logic to check prefetch count value to ConsumeAMQP since prefetch count is declared in its scope. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-6730 AMQP QoS support [nifi]
dan-s1 commented on code in PR #8146: URL: https://github.com/apache/nifi/pull/8146#discussion_r1424500623 ## nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java: ## @@ -112,6 +112,16 @@ public class ConsumeAMQP extends AbstractAMQPProcessor { .defaultValue("10") .required(true) .build(); +static final PropertyDescriptor PREFETCH_COUNT = new PropertyDescriptor.Builder() +.name("prefetch.count") +.displayName("Prefetch Count") +.description("The maximum number of unacknowledged messages for the consumer. If consumer has this number of unacknowledged messages, AMQP broker will " + + "no longer send new messages until consumer acknowledges some of the messages already delivered to it. 0 means no limit") +.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) +.expressionLanguageSupported(ExpressionLanguageScope.NONE) +.defaultValue("0") +.required(true) +.build(); Review Comment: Per the [Rabbit javadocs](https://rabbitmq.github.io/rabbitmq-java-client/api/4.x.x/com/rabbitmq/client/Channel.html#basicQos) it seems the prefetch count must be an integer between 0 and 65535. You should add logic to `AbstractAMQPProcessor#customValidate` method in order to ensure the integer entered is between 0 and 65535 and not greater. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-6730 AMQP QoS support [nifi]
dan-s1 commented on code in PR #8146: URL: https://github.com/apache/nifi/pull/8146#discussion_r1424437510 ## nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java: ## @@ -55,7 +55,7 @@ public void testResponseQueueDrained() throws TimeoutException, IOException { final Map exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1"); final TestConnection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); -final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", true, processorLog); +final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", true, 0, processorLog); Review Comment: Create a static final variable to clearly indicate the default prefetch count is being used e.g. `private static final int DEFAULT_PREFETCH_COUNT = 0;` similarly to what you did in `TestChannel` where you declared a variable for `prefetchCount` ```suggestion final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", true, DEFAULT_PREFETCH_COUNT, processorLog); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-6730 AMQP QoS support [nifi]
dan-s1 commented on code in PR #8146: URL: https://github.com/apache/nifi/pull/8146#discussion_r1424500623 ## nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java: ## @@ -112,6 +112,16 @@ public class ConsumeAMQP extends AbstractAMQPProcessor { .defaultValue("10") .required(true) .build(); +static final PropertyDescriptor PREFETCH_COUNT = new PropertyDescriptor.Builder() +.name("prefetch.count") +.displayName("Prefetch Count") +.description("The maximum number of unacknowledged messages for the consumer. If consumer has this number of unacknowledged messages, AMQP broker will " + + "no longer send new messages until consumer acknowledges some of the messages already delivered to it. 0 means no limit") +.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) +.expressionLanguageSupported(ExpressionLanguageScope.NONE) +.defaultValue("0") +.required(true) +.build(); Review Comment: Per the [Rabbit javadocs](https://rabbitmq.github.io/rabbitmq-java-client/api/4.x.x/com/rabbitmq/client/Channel.html#basicQos) it seems the prefetch count must be an integer between 0 and 65535. You should add logic to `AbstractAMQPProcessor#customValidate` in order to ensure the integer entered is between 0 and 65535 and not greater. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-6730 AMQP QoS support [nifi]
dan-s1 commented on code in PR #8146: URL: https://github.com/apache/nifi/pull/8146#discussion_r1424438916 ## nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java: ## @@ -80,30 +80,30 @@ public void testConsumerHandlesCancelling() throws IOException { @Test public void failOnNullConnection() { -assertThrows(IllegalArgumentException.class, () -> new AMQPConsumer(null, null, true, processorLog)); +assertThrows(IllegalArgumentException.class, () -> new AMQPConsumer(null, null, true, 0, processorLog)); } @Test public void failOnNullQueueName() { assertThrows(IllegalArgumentException.class, () -> { Connection conn = new TestConnection(null, null); -new AMQPConsumer(conn, null, true, processorLog); +new AMQPConsumer(conn, null, true, 0, processorLog); }); } @Test public void failOnEmptyQueueName() { assertThrows(IllegalArgumentException.class, () -> { Connection conn = new TestConnection(null, null); -new AMQPConsumer(conn, " ", true, processorLog); +new AMQPConsumer(conn, " ", true, 0, processorLog); Review Comment: ```suggestion new AMQPConsumer(conn, " ", true, DEFAULT_PREFETCH_COUNT, processorLog); ``` ## nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java: ## @@ -69,7 +69,7 @@ public void testConsumerHandlesCancelling() throws IOException { final Map exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1"); final TestConnection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); -final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", true, processorLog); +final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", true, 0, processorLog); Review Comment: ```suggestion final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", true, DEFAULT_PREFETCH_COUNT, processorLog); ``` ## nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java: ## @@ -132,9 +132,20 @@ public void validateSuccessfullConsumeWithEmptyQueue() throws Exception { Connection conn = new TestConnection(exchangeToRoutingKeymap, routingMap); conn.createChannel().basicPublish("myExchange", "key1", null, "hello Joe".getBytes()); -try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true, processorLog)) { +try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true, 0, processorLog)) { Review Comment: ```suggestion try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true, DEFAULT_PREFETCH_COUNT, processorLog)) { ``` ## nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java: ## @@ -80,30 +80,30 @@ public void testConsumerHandlesCancelling() throws IOException { @Test public void failOnNullConnection() { -assertThrows(IllegalArgumentException.class, () -> new AMQPConsumer(null, null, true, processorLog)); +assertThrows(IllegalArgumentException.class, () -> new AMQPConsumer(null, null, true, 0, processorLog)); } @Test public void failOnNullQueueName() { assertThrows(IllegalArgumentException.class, () -> { Connection conn = new TestConnection(null, null); -new AMQPConsumer(conn, null, true, processorLog); +new AMQPConsumer(conn, null, true, 0, processorLog); Review Comment: ```suggestion new AMQPConsumer(conn, null, true, DEFAULT_PREFETCH_COUNT, processorLog); ``` ## nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java: ## @@ -55,7 +55,7 @@ public void testResponseQueueDrained() throws TimeoutException, IOException { final Map exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1"); final TestConnection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); -final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", true, processorLog); +final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", true, 0, processorLog); Review Comment: Create a static final variable to clearly indicate the default prefetch count is being used e.g. `private static final int DEFAULT_PREFETCH_COUNT = 0;` similarly to what you do in `TestChannel` where you declare a variable for `prefetchCount` ```suggestion final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", true, DEFAULT_PREFETCH_COUNT, proces
[PR] NIFI-6730 AMQP QoS support [nifi]
MooseTheBrown opened a new pull request, #8146: URL: https://github.com/apache/nifi/pull/8146 # Summary [NIFI-6730](https://issues.apache.org/jira/browse/NIFI-6730) Implement basicQos (prefetch count) support in ConsumeAMQP processor. # Tracking Please complete the following tracking steps prior to pull request creation. ### Issue Tracking - [x ] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI) issue created ### Pull Request Tracking - [ x] Pull Request title starts with Apache NiFi Jira issue number, such as `NIFI-0` - [ x] Pull Request commit message starts with Apache NiFi Jira issue number, as such `NIFI-0` ### Pull Request Formatting - [ x] Pull Request based on current revision of the `main` branch - [ x] Pull Request refers to a feature branch with one commit containing changes # Verification Please indicate the verification steps performed prior to pull request creation. ### Build - [x ] Build completed using `mvn clean install -P contrib-check` - [x ] JDK 21 ### Licensing - [ x] New dependencies are compatible with the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License Policy](https://www.apache.org/legal/resolved.html) - [ x] New dependencies are documented in applicable `LICENSE` and `NOTICE` files ### Documentation - [ x] Documentation formatting appears as expected in rendered files -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org