Re: [PR] NIFI-6730 AMQP QoS support [nifi]

2023-12-14 Thread via GitHub


asfgit closed pull request #8146: NIFI-6730 AMQP QoS support
URL: https://github.com/apache/nifi/pull/8146


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-6730 AMQP QoS support [nifi]

2023-12-14 Thread via GitHub


MooseTheBrown commented on code in PR #8146:
URL: https://github.com/apache/nifi/pull/8146#discussion_r1426806679


##
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java:
##
@@ -112,6 +115,17 @@ public class ConsumeAMQP extends 
AbstractAMQPProcessor {
 .defaultValue("10")
 .required(true)
 .build();
+static final PropertyDescriptor PREFETCH_COUNT = new 
PropertyDescriptor.Builder()
+.name("prefetch.count")
+.displayName("Prefetch Count")
+.description("The maximum number of unacknowledged messages for the 
consumer. If consumer has this number of unacknowledged messages, AMQP broker 
will "
+   + "no longer send new messages until consumer acknowledges some 
of the messages already delivered to it."
+   + "Allowed values: from 0 to 65535. 0 means no limit")
+.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)

Review Comment:
   Yes, thanks for pointing this out. Removed unused imports.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-6730 AMQP QoS support [nifi]

2023-12-14 Thread via GitHub


turcsanyip commented on code in PR #8146:
URL: https://github.com/apache/nifi/pull/8146#discussion_r1426750707


##
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java:
##
@@ -112,6 +115,17 @@ public class ConsumeAMQP extends 
AbstractAMQPProcessor {
 .defaultValue("10")
 .required(true)
 .build();
+static final PropertyDescriptor PREFETCH_COUNT = new 
PropertyDescriptor.Builder()
+.name("prefetch.count")
+.displayName("Prefetch Count")
+.description("The maximum number of unacknowledged messages for the 
consumer. If consumer has this number of unacknowledged messages, AMQP broker 
will "
+   + "no longer send new messages until consumer acknowledges some 
of the messages already delivered to it."
+   + "Allowed values: from 0 to 65535. 0 means no limit")
+.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)

Review Comment:
   Thanks @MooseTheBrown! Could you please also remove the now unused imports? 
The build fails with Checkstyle violations. You can check it with running `mvn 
clean verify -P contrib-check` in `nifi-amqp-bundle` directory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-6730 AMQP QoS support [nifi]

2023-12-14 Thread via GitHub


MooseTheBrown commented on code in PR #8146:
URL: https://github.com/apache/nifi/pull/8146#discussion_r1426414623


##
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java:
##
@@ -112,6 +115,17 @@ public class ConsumeAMQP extends 
AbstractAMQPProcessor {
 .defaultValue("10")
 .required(true)
 .build();
+static final PropertyDescriptor PREFETCH_COUNT = new 
PropertyDescriptor.Builder()
+.name("prefetch.count")
+.displayName("Prefetch Count")
+.description("The maximum number of unacknowledged messages for the 
consumer. If consumer has this number of unacknowledged messages, AMQP broker 
will "
+   + "no longer send new messages until consumer acknowledges some 
of the messages already delivered to it."
+   + "Allowed values: from 0 to 65535. 0 means no limit")
+.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)

Review Comment:
   Corrected. Used standard long validator instead of customValidate in 
ConsumeAMQP



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-6730 AMQP QoS support [nifi]

2023-12-13 Thread via GitHub


turcsanyip commented on code in PR #8146:
URL: https://github.com/apache/nifi/pull/8146#discussion_r1425813999


##
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java:
##
@@ -112,6 +115,17 @@ public class ConsumeAMQP extends 
AbstractAMQPProcessor {
 .defaultValue("10")
 .required(true)
 .build();
+static final PropertyDescriptor PREFETCH_COUNT = new 
PropertyDescriptor.Builder()
+.name("prefetch.count")
+.displayName("Prefetch Count")
+.description("The maximum number of unacknowledged messages for the 
consumer. If consumer has this number of unacknowledged messages, AMQP broker 
will "
+   + "no longer send new messages until consumer acknowledges some 
of the messages already delivered to it."
+   + "Allowed values: from 0 to 65535. 0 means no limit")
+.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)

Review Comment:
   @MooseTheBrown Thanks for adding `Prefetch Count` property!
   Please note, there is a standard validator for checking if a number is 
within a range: 
https://github.com/apache/nifi/blob/5b664147eeccd9d2a8ded9040a3d64c96e70d639/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java#L758



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-6730 AMQP QoS support [nifi]

2023-12-13 Thread via GitHub


dan-s1 commented on code in PR #8146:
URL: https://github.com/apache/nifi/pull/8146#discussion_r1425453995


##
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java:
##
@@ -112,6 +112,16 @@ public class ConsumeAMQP extends 
AbstractAMQPProcessor {
 .defaultValue("10")
 .required(true)
 .build();
+static final PropertyDescriptor PREFETCH_COUNT = new 
PropertyDescriptor.Builder()
+.name("prefetch.count")
+.displayName("Prefetch Count")
+.description("The maximum number of unacknowledged messages for the 
consumer. If consumer has this number of unacknowledged messages, AMQP broker 
will "
+   + "no longer send new messages until consumer acknowledges some 
of the messages already delivered to it. 0 means no limit")
+.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+.expressionLanguageSupported(ExpressionLanguageScope.NONE)
+.defaultValue("0")
+.required(true)
+.build();

Review Comment:
   You are correct. I am sorry about that, I am not sure what i was thinking 
when I entered `AbstractAMQPProcessor`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-6730 AMQP QoS support [nifi]

2023-12-13 Thread via GitHub


dan-s1 commented on code in PR #8146:
URL: https://github.com/apache/nifi/pull/8146#discussion_r1425443615


##
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java:
##
@@ -43,6 +43,7 @@
 public class AMQPConsumerTest {
 
 private ComponentLog processorLog;
+private static final int DEFAULT_PREFETCH_COUNT = 0;

Review Comment:
   ```suggestion
   private static final int DEFAULT_PREFETCH_COUNT = 0;
   private ComponentLog processorLog;
   ```



##
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java:
##
@@ -301,7 +316,8 @@ protected synchronized AMQPConsumer createAMQPWorker(final 
ProcessContext contex
 try {
 final String queueName = context.getProperty(QUEUE).getValue();
 final boolean autoAcknowledge = 
context.getProperty(AUTO_ACKNOWLEDGE).asBoolean();
-final AMQPConsumer amqpConsumer = new AMQPConsumer(connection, 
queueName, autoAcknowledge, getLogger());
+final AMQPConsumer amqpConsumer = new AMQPConsumer(connection, 
queueName, autoAcknowledge,
+context.getProperty(PREFETCH_COUNT).asInteger(), 
getLogger());

Review Comment:
   ```suggestion
   final int prefetchCount =  
context.getProperty(PREFETCH_COUNT).asInteger();
   final AMQPConsumer amqpConsumer = new AMQPConsumer(connection, 
queueName, autoAcknowledge, prefetchCount, getLogger());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-6730 AMQP QoS support [nifi]

2023-12-13 Thread via GitHub


MooseTheBrown commented on code in PR #8146:
URL: https://github.com/apache/nifi/pull/8146#discussion_r1425103142


##
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java:
##
@@ -80,30 +80,30 @@ public void testConsumerHandlesCancelling() throws 
IOException {
 
 @Test
 public void failOnNullConnection() {
-assertThrows(IllegalArgumentException.class, () -> new 
AMQPConsumer(null, null, true, processorLog));
+assertThrows(IllegalArgumentException.class, () -> new 
AMQPConsumer(null, null, true, 0, processorLog));
 }
 
 @Test
 public void failOnNullQueueName() {
 assertThrows(IllegalArgumentException.class, () -> {
 Connection conn = new TestConnection(null, null);
-new AMQPConsumer(conn, null, true, processorLog);
+new AMQPConsumer(conn, null, true, 0, processorLog);
 });
 }
 
 @Test
 public void failOnEmptyQueueName() {
 assertThrows(IllegalArgumentException.class, () -> {
 Connection conn = new TestConnection(null, null);
-new AMQPConsumer(conn, " ", true, processorLog);
+new AMQPConsumer(conn, " ", true, 0, processorLog);
 });
 }
 
 @Test
 public void failOnNonExistingQueue() {
 assertThrows(IOException.class, () -> {
 Connection conn = new TestConnection(null, null);
-try (AMQPConsumer consumer = new AMQPConsumer(conn, "hello", true, 
processorLog)) {
+try (AMQPConsumer consumer = new AMQPConsumer(conn, "hello", true, 
0, processorLog)) {

Review Comment:
   Fixed



##
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java:
##
@@ -117,7 +117,7 @@ public void 
validateSuccessfullConsumeWithEmptyQueueDefaultExchange() throws Exc
 exchangeToRoutingKeymap.put("", "queue1");
 
 Connection conn = new TestConnection(exchangeToRoutingKeymap, 
routingMap);
-try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true, 
processorLog)) {
+try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true, 0, 
processorLog)) {

Review Comment:
   Fixed



##
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java:
##
@@ -132,9 +132,20 @@ public void validateSuccessfullConsumeWithEmptyQueue() 
throws Exception {
 
 Connection conn = new TestConnection(exchangeToRoutingKeymap, 
routingMap);
 conn.createChannel().basicPublish("myExchange", "key1", null, "hello 
Joe".getBytes());
-try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true, 
processorLog)) {
+try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true, 0, 
processorLog)) {

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-6730 AMQP QoS support [nifi]

2023-12-13 Thread via GitHub


MooseTheBrown commented on code in PR #8146:
URL: https://github.com/apache/nifi/pull/8146#discussion_r1425102857


##
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java:
##
@@ -80,30 +80,30 @@ public void testConsumerHandlesCancelling() throws 
IOException {
 
 @Test
 public void failOnNullConnection() {
-assertThrows(IllegalArgumentException.class, () -> new 
AMQPConsumer(null, null, true, processorLog));
+assertThrows(IllegalArgumentException.class, () -> new 
AMQPConsumer(null, null, true, 0, processorLog));

Review Comment:
   Fixed



##
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java:
##
@@ -80,30 +80,30 @@ public void testConsumerHandlesCancelling() throws 
IOException {
 
 @Test
 public void failOnNullConnection() {
-assertThrows(IllegalArgumentException.class, () -> new 
AMQPConsumer(null, null, true, processorLog));
+assertThrows(IllegalArgumentException.class, () -> new 
AMQPConsumer(null, null, true, 0, processorLog));
 }
 
 @Test
 public void failOnNullQueueName() {
 assertThrows(IllegalArgumentException.class, () -> {
 Connection conn = new TestConnection(null, null);
-new AMQPConsumer(conn, null, true, processorLog);
+new AMQPConsumer(conn, null, true, 0, processorLog);

Review Comment:
   Fixed



##
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java:
##
@@ -80,30 +80,30 @@ public void testConsumerHandlesCancelling() throws 
IOException {
 
 @Test
 public void failOnNullConnection() {
-assertThrows(IllegalArgumentException.class, () -> new 
AMQPConsumer(null, null, true, processorLog));
+assertThrows(IllegalArgumentException.class, () -> new 
AMQPConsumer(null, null, true, 0, processorLog));
 }
 
 @Test
 public void failOnNullQueueName() {
 assertThrows(IllegalArgumentException.class, () -> {
 Connection conn = new TestConnection(null, null);
-new AMQPConsumer(conn, null, true, processorLog);
+new AMQPConsumer(conn, null, true, 0, processorLog);
 });
 }
 
 @Test
 public void failOnEmptyQueueName() {
 assertThrows(IllegalArgumentException.class, () -> {
 Connection conn = new TestConnection(null, null);
-new AMQPConsumer(conn, " ", true, processorLog);
+new AMQPConsumer(conn, " ", true, 0, processorLog);

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-6730 AMQP QoS support [nifi]

2023-12-13 Thread via GitHub


MooseTheBrown commented on code in PR #8146:
URL: https://github.com/apache/nifi/pull/8146#discussion_r1425102583


##
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java:
##
@@ -55,7 +55,7 @@ public void testResponseQueueDrained() throws 
TimeoutException, IOException {
 final Map exchangeToRoutingKeymap = 
Collections.singletonMap("myExchange", "key1");
 
 final TestConnection connection = new 
TestConnection(exchangeToRoutingKeymap, routingMap);
-final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", 
true, processorLog);
+final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", 
true, 0, processorLog);

Review Comment:
   Fixed.



##
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java:
##
@@ -69,7 +69,7 @@ public void testConsumerHandlesCancelling() throws 
IOException {
 final Map exchangeToRoutingKeymap = 
Collections.singletonMap("myExchange", "key1");
 
 final TestConnection connection = new 
TestConnection(exchangeToRoutingKeymap, routingMap);
-final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", 
true, processorLog);
+final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", 
true, 0, processorLog);

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-6730 AMQP QoS support [nifi]

2023-12-13 Thread via GitHub


MooseTheBrown commented on code in PR #8146:
URL: https://github.com/apache/nifi/pull/8146#discussion_r1425102068


##
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java:
##
@@ -112,6 +112,16 @@ public class ConsumeAMQP extends 
AbstractAMQPProcessor {
 .defaultValue("10")
 .required(true)
 .build();
+static final PropertyDescriptor PREFETCH_COUNT = new 
PropertyDescriptor.Builder()
+.name("prefetch.count")
+.displayName("Prefetch Count")
+.description("The maximum number of unacknowledged messages for the 
consumer. If consumer has this number of unacknowledged messages, AMQP broker 
will "
+   + "no longer send new messages until consumer acknowledges some 
of the messages already delivered to it. 0 means no limit")
+.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+.expressionLanguageSupported(ExpressionLanguageScope.NONE)
+.defaultValue("0")
+.required(true)
+.build();

Review Comment:
   Added customValidate with logic to check prefetch count value to ConsumeAMQP 
since prefetch count is declared in its scope.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-6730 AMQP QoS support [nifi]

2023-12-12 Thread via GitHub


dan-s1 commented on code in PR #8146:
URL: https://github.com/apache/nifi/pull/8146#discussion_r1424500623


##
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java:
##
@@ -112,6 +112,16 @@ public class ConsumeAMQP extends 
AbstractAMQPProcessor {
 .defaultValue("10")
 .required(true)
 .build();
+static final PropertyDescriptor PREFETCH_COUNT = new 
PropertyDescriptor.Builder()
+.name("prefetch.count")
+.displayName("Prefetch Count")
+.description("The maximum number of unacknowledged messages for the 
consumer. If consumer has this number of unacknowledged messages, AMQP broker 
will "
+   + "no longer send new messages until consumer acknowledges some 
of the messages already delivered to it. 0 means no limit")
+.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+.expressionLanguageSupported(ExpressionLanguageScope.NONE)
+.defaultValue("0")
+.required(true)
+.build();

Review Comment:
   Per the [Rabbit 
javadocs](https://rabbitmq.github.io/rabbitmq-java-client/api/4.x.x/com/rabbitmq/client/Channel.html#basicQos)
 it seems the prefetch count must be an integer between 0 and 65535. You should 
add logic to `AbstractAMQPProcessor#customValidate`  method in order to ensure 
the integer entered is between 0 and  65535 and not greater.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-6730 AMQP QoS support [nifi]

2023-12-12 Thread via GitHub


dan-s1 commented on code in PR #8146:
URL: https://github.com/apache/nifi/pull/8146#discussion_r1424437510


##
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java:
##
@@ -55,7 +55,7 @@ public void testResponseQueueDrained() throws 
TimeoutException, IOException {
 final Map exchangeToRoutingKeymap = 
Collections.singletonMap("myExchange", "key1");
 
 final TestConnection connection = new 
TestConnection(exchangeToRoutingKeymap, routingMap);
-final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", 
true, processorLog);
+final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", 
true, 0, processorLog);

Review Comment:
   Create a static final variable to clearly indicate the default prefetch 
count is being used e.g.
   `private static final int DEFAULT_PREFETCH_COUNT = 0;`
   similarly to what you did in `TestChannel` where you declared a variable for 
`prefetchCount`
   
   ```suggestion
   final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", 
true, DEFAULT_PREFETCH_COUNT, processorLog);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-6730 AMQP QoS support [nifi]

2023-12-12 Thread via GitHub


dan-s1 commented on code in PR #8146:
URL: https://github.com/apache/nifi/pull/8146#discussion_r1424500623


##
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java:
##
@@ -112,6 +112,16 @@ public class ConsumeAMQP extends 
AbstractAMQPProcessor {
 .defaultValue("10")
 .required(true)
 .build();
+static final PropertyDescriptor PREFETCH_COUNT = new 
PropertyDescriptor.Builder()
+.name("prefetch.count")
+.displayName("Prefetch Count")
+.description("The maximum number of unacknowledged messages for the 
consumer. If consumer has this number of unacknowledged messages, AMQP broker 
will "
+   + "no longer send new messages until consumer acknowledges some 
of the messages already delivered to it. 0 means no limit")
+.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+.expressionLanguageSupported(ExpressionLanguageScope.NONE)
+.defaultValue("0")
+.required(true)
+.build();

Review Comment:
   Per the [Rabbit 
javadocs](https://rabbitmq.github.io/rabbitmq-java-client/api/4.x.x/com/rabbitmq/client/Channel.html#basicQos)
 it seems the prefetch count must be an integer between 0 and 65535. You should 
add logic to `AbstractAMQPProcessor#customValidate`  in order to ensure the 
integer entered is between 0 and  65535 and not greater.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-6730 AMQP QoS support [nifi]

2023-12-12 Thread via GitHub


dan-s1 commented on code in PR #8146:
URL: https://github.com/apache/nifi/pull/8146#discussion_r1424438916


##
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java:
##
@@ -80,30 +80,30 @@ public void testConsumerHandlesCancelling() throws 
IOException {
 
 @Test
 public void failOnNullConnection() {
-assertThrows(IllegalArgumentException.class, () -> new 
AMQPConsumer(null, null, true, processorLog));
+assertThrows(IllegalArgumentException.class, () -> new 
AMQPConsumer(null, null, true, 0, processorLog));
 }
 
 @Test
 public void failOnNullQueueName() {
 assertThrows(IllegalArgumentException.class, () -> {
 Connection conn = new TestConnection(null, null);
-new AMQPConsumer(conn, null, true, processorLog);
+new AMQPConsumer(conn, null, true, 0, processorLog);
 });
 }
 
 @Test
 public void failOnEmptyQueueName() {
 assertThrows(IllegalArgumentException.class, () -> {
 Connection conn = new TestConnection(null, null);
-new AMQPConsumer(conn, " ", true, processorLog);
+new AMQPConsumer(conn, " ", true, 0, processorLog);

Review Comment:
   ```suggestion
   new AMQPConsumer(conn, " ", true, DEFAULT_PREFETCH_COUNT, 
processorLog);
   ```



##
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java:
##
@@ -69,7 +69,7 @@ public void testConsumerHandlesCancelling() throws 
IOException {
 final Map exchangeToRoutingKeymap = 
Collections.singletonMap("myExchange", "key1");
 
 final TestConnection connection = new 
TestConnection(exchangeToRoutingKeymap, routingMap);
-final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", 
true, processorLog);
+final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", 
true, 0, processorLog);

Review Comment:
   ```suggestion
   final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", 
true, DEFAULT_PREFETCH_COUNT, processorLog);
   ```



##
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java:
##
@@ -132,9 +132,20 @@ public void validateSuccessfullConsumeWithEmptyQueue() 
throws Exception {
 
 Connection conn = new TestConnection(exchangeToRoutingKeymap, 
routingMap);
 conn.createChannel().basicPublish("myExchange", "key1", null, "hello 
Joe".getBytes());
-try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true, 
processorLog)) {
+try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true, 0, 
processorLog)) {

Review Comment:
   ```suggestion
   try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true, 
DEFAULT_PREFETCH_COUNT, processorLog)) {
   ```



##
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java:
##
@@ -80,30 +80,30 @@ public void testConsumerHandlesCancelling() throws 
IOException {
 
 @Test
 public void failOnNullConnection() {
-assertThrows(IllegalArgumentException.class, () -> new 
AMQPConsumer(null, null, true, processorLog));
+assertThrows(IllegalArgumentException.class, () -> new 
AMQPConsumer(null, null, true, 0, processorLog));
 }
 
 @Test
 public void failOnNullQueueName() {
 assertThrows(IllegalArgumentException.class, () -> {
 Connection conn = new TestConnection(null, null);
-new AMQPConsumer(conn, null, true, processorLog);
+new AMQPConsumer(conn, null, true, 0, processorLog);

Review Comment:
   ```suggestion
   new AMQPConsumer(conn, null, true, DEFAULT_PREFETCH_COUNT, 
processorLog);
   ```



##
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java:
##
@@ -55,7 +55,7 @@ public void testResponseQueueDrained() throws 
TimeoutException, IOException {
 final Map exchangeToRoutingKeymap = 
Collections.singletonMap("myExchange", "key1");
 
 final TestConnection connection = new 
TestConnection(exchangeToRoutingKeymap, routingMap);
-final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", 
true, processorLog);
+final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", 
true, 0, processorLog);

Review Comment:
   Create a static final variable to clearly indicate the default prefetch 
count is being used e.g.
   `private static final int DEFAULT_PREFETCH_COUNT = 0;`
   similarly to what you do in `TestChannel` where you declare a variable for 
`prefetchCount`
   
   ```suggestion
   final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", 
true, DEFAULT_PREFETCH_COUNT, proces

[PR] NIFI-6730 AMQP QoS support [nifi]

2023-12-08 Thread via GitHub


MooseTheBrown opened a new pull request, #8146:
URL: https://github.com/apache/nifi/pull/8146

   
   
   
   
   
   
   
   
   
   
   
   
   
   # Summary
   
   [NIFI-6730](https://issues.apache.org/jira/browse/NIFI-6730)
   Implement basicQos (prefetch count) support in ConsumeAMQP processor.
   
   # Tracking
   
   Please complete the following tracking steps prior to pull request creation.
   
   ### Issue Tracking
   
   - [x ] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI) issue 
created
   
   ### Pull Request Tracking
   
   - [ x] Pull Request title starts with Apache NiFi Jira issue number, such as 
`NIFI-0`
   - [ x] Pull Request commit message starts with Apache NiFi Jira issue 
number, as such `NIFI-0`
   
   ### Pull Request Formatting
   
   - [ x] Pull Request based on current revision of the `main` branch
   - [ x] Pull Request refers to a feature branch with one commit containing 
changes
   
   # Verification
   
   Please indicate the verification steps performed prior to pull request 
creation.
   
   ### Build
   
   - [x ] Build completed using `mvn clean install -P contrib-check`
 - [x ] JDK 21
   
   ### Licensing
   
   - [ x] New dependencies are compatible with the [Apache License 
2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License 
Policy](https://www.apache.org/legal/resolved.html)
   - [ x] New dependencies are documented in applicable `LICENSE` and `NOTICE` 
files
   
   ### Documentation
   
   - [ x] Documentation formatting appears as expected in rendered files
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org