This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 6e1bdb2955f CAMEL-18376: camel-pulsar: add redelivery backoff for ack 
timeout, nack (#8147)
6e1bdb2955f is described below

commit 6e1bdb2955fb6de483d5ef95042ac33377865600
Author: Anthony Sam Wu <55711988+anthonywu-to...@users.noreply.github.com>
AuthorDate: Sat Aug 13 04:33:22 2022 -0400

    CAMEL-18376: camel-pulsar: add redelivery backoff for ack timeout, nack 
(#8147)
    
    * CAMEL-18376: camel-pulsar; add redelivery backoff for ack timeout, 
negative ack
    
    * Fix import order/linebreak
    
    * Apply formatting
    
    * Provide ack timeout in negative ack test
    
    * Rebind in test
    
    * Set backoff not in endpoint query param
    
    * Fix import
    
    * Fix tests
---
 .../pulsar/PulsarComponentConfigurer.java          | 12 +++++++
 .../component/pulsar/PulsarEndpointConfigurer.java | 12 +++++++
 .../component/pulsar/PulsarEndpointUriFactory.java |  4 ++-
 .../org/apache/camel/component/pulsar/pulsar.json  |  4 +++
 .../component/pulsar/PulsarConfiguration.java      | 27 +++++++++++++++
 .../consumers/CommonCreationStrategyImpl.java      | 10 ++++++
 .../PulsarConsumerDeadLetterPolicyIT.java          |  6 ++--
 ...> PulsarConsumerNegativeAcknowledgementIT.java} | 38 +++++++++++++++-------
 .../PulsarConsumerNoAcknowledgementIT.java         | 29 +++++++++++++++--
 9 files changed, 124 insertions(+), 18 deletions(-)

diff --git 
a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarComponentConfigurer.java
 
b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarComponentConfigurer.java
index 86afa21c75a..97dab737df9 100644
--- 
a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarComponentConfigurer.java
+++ 
b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarComponentConfigurer.java
@@ -32,6 +32,8 @@ public class PulsarComponentConfigurer extends 
PropertyConfigurerSupport impleme
         case "ackGroupTimeMillis": 
getOrCreateConfiguration(target).setAckGroupTimeMillis(property(camelContext, 
long.class, value)); return true;
         case "acktimeoutmillis":
         case "ackTimeoutMillis": 
getOrCreateConfiguration(target).setAckTimeoutMillis(property(camelContext, 
long.class, value)); return true;
+        case "acktimeoutredeliverybackoff":
+        case "ackTimeoutRedeliveryBackoff": 
getOrCreateConfiguration(target).setAckTimeoutRedeliveryBackoff(property(camelContext,
 org.apache.pulsar.client.api.RedeliveryBackoff.class, value)); return true;
         case "allowmanualacknowledgement":
         case "allowManualAcknowledgement": 
getOrCreateConfiguration(target).setAllowManualAcknowledgement(property(camelContext,
 boolean.class, value)); return true;
         case "authenticationclass":
@@ -81,6 +83,8 @@ public class PulsarComponentConfigurer extends 
PropertyConfigurerSupport impleme
         case "messageRouter": 
getOrCreateConfiguration(target).setMessageRouter(property(camelContext, 
org.apache.pulsar.client.api.MessageRouter.class, value)); return true;
         case "messageroutingmode":
         case "messageRoutingMode": 
getOrCreateConfiguration(target).setMessageRoutingMode(property(camelContext, 
org.apache.pulsar.client.api.MessageRoutingMode.class, value)); return true;
+        case "negativeackredeliverybackoff":
+        case "negativeAckRedeliveryBackoff": 
getOrCreateConfiguration(target).setNegativeAckRedeliveryBackoff(property(camelContext,
 org.apache.pulsar.client.api.RedeliveryBackoff.class, value)); return true;
         case "negativeackredeliverydelaymicros":
         case "negativeAckRedeliveryDelayMicros": 
getOrCreateConfiguration(target).setNegativeAckRedeliveryDelayMicros(property(camelContext,
 long.class, value)); return true;
         case "numberofconsumerthreads":
@@ -125,6 +129,8 @@ public class PulsarComponentConfigurer extends 
PropertyConfigurerSupport impleme
         case "ackGroupTimeMillis": return long.class;
         case "acktimeoutmillis":
         case "ackTimeoutMillis": return long.class;
+        case "acktimeoutredeliverybackoff":
+        case "ackTimeoutRedeliveryBackoff": return 
org.apache.pulsar.client.api.RedeliveryBackoff.class;
         case "allowmanualacknowledgement":
         case "allowManualAcknowledgement": return boolean.class;
         case "authenticationclass":
@@ -174,6 +180,8 @@ public class PulsarComponentConfigurer extends 
PropertyConfigurerSupport impleme
         case "messageRouter": return 
org.apache.pulsar.client.api.MessageRouter.class;
         case "messageroutingmode":
         case "messageRoutingMode": return 
org.apache.pulsar.client.api.MessageRoutingMode.class;
+        case "negativeackredeliverybackoff":
+        case "negativeAckRedeliveryBackoff": return 
org.apache.pulsar.client.api.RedeliveryBackoff.class;
         case "negativeackredeliverydelaymicros":
         case "negativeAckRedeliveryDelayMicros": return long.class;
         case "numberofconsumerthreads":
@@ -214,6 +222,8 @@ public class PulsarComponentConfigurer extends 
PropertyConfigurerSupport impleme
         case "ackGroupTimeMillis": return 
getOrCreateConfiguration(target).getAckGroupTimeMillis();
         case "acktimeoutmillis":
         case "ackTimeoutMillis": return 
getOrCreateConfiguration(target).getAckTimeoutMillis();
+        case "acktimeoutredeliverybackoff":
+        case "ackTimeoutRedeliveryBackoff": return 
getOrCreateConfiguration(target).getAckTimeoutRedeliveryBackoff();
         case "allowmanualacknowledgement":
         case "allowManualAcknowledgement": return 
getOrCreateConfiguration(target).isAllowManualAcknowledgement();
         case "authenticationclass":
@@ -263,6 +273,8 @@ public class PulsarComponentConfigurer extends 
PropertyConfigurerSupport impleme
         case "messageRouter": return 
getOrCreateConfiguration(target).getMessageRouter();
         case "messageroutingmode":
         case "messageRoutingMode": return 
getOrCreateConfiguration(target).getMessageRoutingMode();
+        case "negativeackredeliverybackoff":
+        case "negativeAckRedeliveryBackoff": return 
getOrCreateConfiguration(target).getNegativeAckRedeliveryBackoff();
         case "negativeackredeliverydelaymicros":
         case "negativeAckRedeliveryDelayMicros": return 
getOrCreateConfiguration(target).getNegativeAckRedeliveryDelayMicros();
         case "numberofconsumerthreads":
diff --git 
a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
 
b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
index f19746e95f0..08be79b9ea5 100644
--- 
a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
+++ 
b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
@@ -25,6 +25,8 @@ public class PulsarEndpointConfigurer extends 
PropertyConfigurerSupport implemen
         case "ackGroupTimeMillis": 
target.getPulsarConfiguration().setAckGroupTimeMillis(property(camelContext, 
long.class, value)); return true;
         case "acktimeoutmillis":
         case "ackTimeoutMillis": 
target.getPulsarConfiguration().setAckTimeoutMillis(property(camelContext, 
long.class, value)); return true;
+        case "acktimeoutredeliverybackoff":
+        case "ackTimeoutRedeliveryBackoff": 
target.getPulsarConfiguration().setAckTimeoutRedeliveryBackoff(property(camelContext,
 org.apache.pulsar.client.api.RedeliveryBackoff.class, value)); return true;
         case "allowmanualacknowledgement":
         case "allowManualAcknowledgement": 
target.getPulsarConfiguration().setAllowManualAcknowledgement(property(camelContext,
 boolean.class, value)); return true;
         case "authenticationclass":
@@ -73,6 +75,8 @@ public class PulsarEndpointConfigurer extends 
PropertyConfigurerSupport implemen
         case "messageRouter": 
target.getPulsarConfiguration().setMessageRouter(property(camelContext, 
org.apache.pulsar.client.api.MessageRouter.class, value)); return true;
         case "messageroutingmode":
         case "messageRoutingMode": 
target.getPulsarConfiguration().setMessageRoutingMode(property(camelContext, 
org.apache.pulsar.client.api.MessageRoutingMode.class, value)); return true;
+        case "negativeackredeliverybackoff":
+        case "negativeAckRedeliveryBackoff": 
target.getPulsarConfiguration().setNegativeAckRedeliveryBackoff(property(camelContext,
 org.apache.pulsar.client.api.RedeliveryBackoff.class, value)); return true;
         case "negativeackredeliverydelaymicros":
         case "negativeAckRedeliveryDelayMicros": 
target.getPulsarConfiguration().setNegativeAckRedeliveryDelayMicros(property(camelContext,
 long.class, value)); return true;
         case "numberofconsumerthreads":
@@ -108,6 +112,8 @@ public class PulsarEndpointConfigurer extends 
PropertyConfigurerSupport implemen
         case "ackGroupTimeMillis": return long.class;
         case "acktimeoutmillis":
         case "ackTimeoutMillis": return long.class;
+        case "acktimeoutredeliverybackoff":
+        case "ackTimeoutRedeliveryBackoff": return 
org.apache.pulsar.client.api.RedeliveryBackoff.class;
         case "allowmanualacknowledgement":
         case "allowManualAcknowledgement": return boolean.class;
         case "authenticationclass":
@@ -156,6 +162,8 @@ public class PulsarEndpointConfigurer extends 
PropertyConfigurerSupport implemen
         case "messageRouter": return 
org.apache.pulsar.client.api.MessageRouter.class;
         case "messageroutingmode":
         case "messageRoutingMode": return 
org.apache.pulsar.client.api.MessageRoutingMode.class;
+        case "negativeackredeliverybackoff":
+        case "negativeAckRedeliveryBackoff": return 
org.apache.pulsar.client.api.RedeliveryBackoff.class;
         case "negativeackredeliverydelaymicros":
         case "negativeAckRedeliveryDelayMicros": return long.class;
         case "numberofconsumerthreads":
@@ -192,6 +200,8 @@ public class PulsarEndpointConfigurer extends 
PropertyConfigurerSupport implemen
         case "ackGroupTimeMillis": return 
target.getPulsarConfiguration().getAckGroupTimeMillis();
         case "acktimeoutmillis":
         case "ackTimeoutMillis": return 
target.getPulsarConfiguration().getAckTimeoutMillis();
+        case "acktimeoutredeliverybackoff":
+        case "ackTimeoutRedeliveryBackoff": return 
target.getPulsarConfiguration().getAckTimeoutRedeliveryBackoff();
         case "allowmanualacknowledgement":
         case "allowManualAcknowledgement": return 
target.getPulsarConfiguration().isAllowManualAcknowledgement();
         case "authenticationclass":
@@ -240,6 +250,8 @@ public class PulsarEndpointConfigurer extends 
PropertyConfigurerSupport implemen
         case "messageRouter": return 
target.getPulsarConfiguration().getMessageRouter();
         case "messageroutingmode":
         case "messageRoutingMode": return 
target.getPulsarConfiguration().getMessageRoutingMode();
+        case "negativeackredeliverybackoff":
+        case "negativeAckRedeliveryBackoff": return 
target.getPulsarConfiguration().getNegativeAckRedeliveryBackoff();
         case "negativeackredeliverydelaymicros":
         case "negativeAckRedeliveryDelayMicros": return 
target.getPulsarConfiguration().getNegativeAckRedeliveryDelayMicros();
         case "numberofconsumerthreads":
diff --git 
a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointUriFactory.java
 
b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointUriFactory.java
index 87e2015e5be..3bb04d4d8c6 100644
--- 
a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointUriFactory.java
+++ 
b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointUriFactory.java
@@ -21,9 +21,10 @@ public class PulsarEndpointUriFactory extends 
org.apache.camel.support.component
     private static final Set<String> SECRET_PROPERTY_NAMES;
     private static final Set<String> MULTI_VALUE_PREFIXES;
     static {
-        Set<String> props = new HashSet<>(42);
+        Set<String> props = new HashSet<>(44);
         props.add("ackGroupTimeMillis");
         props.add("ackTimeoutMillis");
+        props.add("ackTimeoutRedeliveryBackoff");
         props.add("allowManualAcknowledgement");
         props.add("authenticationClass");
         props.add("authenticationParams");
@@ -49,6 +50,7 @@ public class PulsarEndpointUriFactory extends 
org.apache.camel.support.component
         props.add("messageRouter");
         props.add("messageRoutingMode");
         props.add("namespace");
+        props.add("negativeAckRedeliveryBackoff");
         props.add("negativeAckRedeliveryDelayMicros");
         props.add("numberOfConsumerThreads");
         props.add("numberOfConsumers");
diff --git 
a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
 
b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
index c28d2bb37a1..d2302601763 100644
--- 
a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
+++ 
b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
@@ -28,6 +28,7 @@
     "serviceUrl": { "kind": "property", "displayName": "Service Url", "group": 
"common", "label": "common", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", 
"configurationField": "configuration", "description": "The Pulsar Service URL 
to point while creating the client from URI" },
     "ackGroupTimeMillis": { "kind": "property", "displayName": "Ack Group Time 
Millis", "group": "consumer", "label": "consumer", "required": false, "type": 
"integer", "javaType": "long", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": 100, "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"configuration", "description": "Group the consumer acknowledgments for the 
specified time in milliseconds - defaults to 100" },
     "ackTimeoutMillis": { "kind": "property", "displayName": "Ack Timeout 
Millis", "group": "consumer", "label": "consumer", "required": false, "type": 
"integer", "javaType": "long", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": 10000, "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"configuration", "description": "Timeout for unacknowledged messages in 
milliseconds - defaults to 10000" },
+    "ackTimeoutRedeliveryBackoff": { "kind": "property", "displayName": "Ack 
Timeout Redelivery Backoff", "group": "consumer", "label": "consumer", 
"required": false, "type": "object", "javaType": 
"org.apache.pulsar.client.api.RedeliveryBackoff", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"configuration", "description": "RedeliveryBackoff to use for ack timeout 
redelivery b [...]
     "allowManualAcknowledgement": { "kind": "property", "displayName": "Allow 
Manual Acknowledgement", "group": "consumer", "label": "consumer", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": false, 
"configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", 
"configurationField": "configuration", "description": "Whether to allow manual 
message acknowledgements. If this option is ena [...]
     "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error 
Handler", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "Allows for bridging the 
consumer to the Camel routing Error Handler, which mean any exceptions occurred 
while the consumer is trying to pickup incoming messages, or the likes, will 
now be processed as a me [...]
     "consumerName": { "kind": "property", "displayName": "Consumer Name", 
"group": "consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": "sole-consumer", "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"configuration", "description": "Name of the consumer when subscription is 
EXCLUSIVE" },
@@ -36,6 +37,7 @@
     "deadLetterTopic": { "kind": "property", "displayName": "Dead Letter 
Topic", "group": "consumer", "label": "consumer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"configuration", "description": "Name of the topic where the messages which 
fail maxRedeliverCount times will be sent. Note: if not set, defa [...]
     "maxRedeliverCount": { "kind": "property", "displayName": "Max Redeliver 
Count", "group": "consumer", "label": "consumer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"configuration", "description": "Maximum number of times that a message will be 
redelivered before being sent to the dead letter queue. [...]
     "messageListener": { "kind": "property", "displayName": "Message 
Listener", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": true, "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"configuration", "description": "Whether to use the messageListener interface, 
or to receive messages using a separate thread pool" },
+    "negativeAckRedeliveryBackoff": { "kind": "property", "displayName": 
"Negative Ack Redelivery Backoff", "group": "consumer", "label": "consumer", 
"required": false, "type": "object", "javaType": 
"org.apache.pulsar.client.api.RedeliveryBackoff", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"configuration", "description": "RedeliveryBackoff to use for negative ack 
redeliver [...]
     "negativeAckRedeliveryDelayMicros": { "kind": "property", "displayName": 
"Negative Ack Redelivery Delay Micros", "group": "consumer", "label": 
"consumer", "required": false, "type": "integer", "javaType": "long", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 
60000000, "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"configuration", "description": "Set the negative acknowledgement delay" },
     "numberOfConsumers": { "kind": "property", "displayName": "Number Of 
Consumers", "group": "consumer", "label": "consumer", "required": false, 
"type": "integer", "javaType": "int", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": 1, "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"configuration", "description": "Number of consumers - defaults to 1" },
     "numberOfConsumerThreads": { "kind": "property", "displayName": "Number Of 
Consumer Threads", "group": "consumer", "label": "consumer", "required": false, 
"type": "integer", "javaType": "int", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": 1, "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"configuration", "description": "Number of threads to receive and handle 
messages when using a separate thread pool" },
@@ -90,6 +92,7 @@
     "serviceUrl": { "kind": "parameter", "displayName": "Service Url", 
"group": "common", "label": "common", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"pulsarConfiguration", "description": "The Pulsar Service URL to point while 
creating the client from URI" },
     "ackGroupTimeMillis": { "kind": "parameter", "displayName": "Ack Group 
Time Millis", "group": "consumer", "label": "consumer", "required": false, 
"type": "integer", "javaType": "long", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": 100, "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"pulsarConfiguration", "description": "Group the consumer acknowledgments for 
the specified time in milliseconds - defaults [...]
     "ackTimeoutMillis": { "kind": "parameter", "displayName": "Ack Timeout 
Millis", "group": "consumer", "label": "consumer", "required": false, "type": 
"integer", "javaType": "long", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": 10000, "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"pulsarConfiguration", "description": "Timeout for unacknowledged messages in 
milliseconds - defaults to 10000" },
+    "ackTimeoutRedeliveryBackoff": { "kind": "parameter", "displayName": "Ack 
Timeout Redelivery Backoff", "group": "consumer", "label": "consumer", 
"required": false, "type": "object", "javaType": 
"org.apache.pulsar.client.api.RedeliveryBackoff", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"pulsarConfiguration", "description": "RedeliveryBackoff to use for ack timeout 
redel [...]
     "allowManualAcknowledgement": { "kind": "parameter", "displayName": "Allow 
Manual Acknowledgement", "group": "consumer", "label": "consumer", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": false, 
"configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", 
"configurationField": "pulsarConfiguration", "description": "Whether to allow 
manual message acknowledgements. If this option [...]
     "consumerName": { "kind": "parameter", "displayName": "Consumer Name", 
"group": "consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": "sole-consumer", "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"pulsarConfiguration", "description": "Name of the consumer when subscription 
is EXCLUSIVE" },
     "consumerNamePrefix": { "kind": "parameter", "displayName": "Consumer Name 
Prefix", "group": "consumer", "label": "consumer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "cons", "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"pulsarConfiguration", "description": "Prefix to add to consumer names when a 
SHARED or FAILOVER subscription  [...]
@@ -97,6 +100,7 @@
     "deadLetterTopic": { "kind": "parameter", "displayName": "Dead Letter 
Topic", "group": "consumer", "label": "consumer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"pulsarConfiguration", "description": "Name of the topic where the messages 
which fail maxRedeliverCount times will be sent. Note: if not se [...]
     "maxRedeliverCount": { "kind": "parameter", "displayName": "Max Redeliver 
Count", "group": "consumer", "label": "consumer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"pulsarConfiguration", "description": "Maximum number of times that a message 
will be redelivered before being sent to the dead letter [...]
     "messageListener": { "kind": "parameter", "displayName": "Message 
Listener", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": true, "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"pulsarConfiguration", "description": "Whether to use the messageListener 
interface, or to receive messages using a separate th [...]
+    "negativeAckRedeliveryBackoff": { "kind": "parameter", "displayName": 
"Negative Ack Redelivery Backoff", "group": "consumer", "label": "consumer", 
"required": false, "type": "object", "javaType": 
"org.apache.pulsar.client.api.RedeliveryBackoff", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"pulsarConfiguration", "description": "RedeliveryBackoff to use for negative 
ack re [...]
     "negativeAckRedeliveryDelayMicros": { "kind": "parameter", "displayName": 
"Negative Ack Redelivery Delay Micros", "group": "consumer", "label": 
"consumer", "required": false, "type": "integer", "javaType": "long", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 
60000000, "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"pulsarConfiguration", "description": "Set the negative acknowledgement delay" 
},
     "numberOfConsumers": { "kind": "parameter", "displayName": "Number Of 
Consumers", "group": "consumer", "label": "consumer", "required": false, 
"type": "integer", "javaType": "int", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": 1, "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"pulsarConfiguration", "description": "Number of consumers - defaults to 1" },
     "numberOfConsumerThreads": { "kind": "parameter", "displayName": "Number 
Of Consumer Threads", "group": "consumer", "label": "consumer", "required": 
false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": 1, "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"pulsarConfiguration", "description": "Number of threads to receive and handle 
messages when using a separate thread [...]
diff --git 
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
 
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
index 5f22892bc5c..1d9aa490e71 100644
--- 
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
+++ 
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.client.api.BatcherBuilder;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.MessageRouter;
 import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.RedeliveryBackoff;
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
 
 import static 
org.apache.camel.component.pulsar.utils.consumers.SubscriptionInitialPosition.LATEST;
@@ -65,6 +66,10 @@ public class PulsarConfiguration implements Cloneable {
     private long ackTimeoutMillis = 10000;
     @UriParam(label = "consumer", defaultValue = "60000000")
     private long negativeAckRedeliveryDelayMicros = 60000000;
+    @UriParam(label = "consumer", description = "RedeliveryBackoff to use for 
ack timeout redelivery backoff.")
+    private RedeliveryBackoff ackTimeoutRedeliveryBackoff;
+    @UriParam(label = "consumer", description = "RedeliveryBackoff to use for 
negative ack redelivery backoff.")
+    private RedeliveryBackoff negativeAckRedeliveryBackoff;
     @UriParam(label = "consumer", defaultValue = "100")
     private long ackGroupTimeMillis = 100;
     @UriParam(label = "consumer", defaultValue = "LATEST")
@@ -455,6 +460,28 @@ public class PulsarConfiguration implements Cloneable {
         this.negativeAckRedeliveryDelayMicros = 
negativeAckRedeliveryDelayMicros;
     }
 
+    public RedeliveryBackoff getAckTimeoutRedeliveryBackoff() {
+        return ackTimeoutRedeliveryBackoff;
+    }
+
+    /**
+     * Set a RedeliveryBackoff to use for ack timeout redelivery backoff.
+     */
+    public void setAckTimeoutRedeliveryBackoff(RedeliveryBackoff 
redeliveryBackoff) {
+        this.ackTimeoutRedeliveryBackoff = redeliveryBackoff;
+    }
+
+    public RedeliveryBackoff getNegativeAckRedeliveryBackoff() {
+        return negativeAckRedeliveryBackoff;
+    }
+
+    /**
+     * Set a RedeliveryBackoff to use for negative ack redelivery backoff.
+     */
+    public void setNegativeAckRedeliveryBackoff(RedeliveryBackoff 
redeliveryBackoff) {
+        this.negativeAckRedeliveryBackoff = redeliveryBackoff;
+    }
+
     public Integer getMaxRedeliverCount() {
         return maxRedeliverCount;
     }
diff --git 
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
 
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
index 2cdfa62625b..32446880a28 100644
--- 
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
+++ 
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
@@ -22,6 +22,7 @@ import org.apache.camel.component.pulsar.PulsarConfiguration;
 import org.apache.camel.component.pulsar.PulsarConsumer;
 import org.apache.camel.component.pulsar.PulsarEndpoint;
 import org.apache.camel.component.pulsar.PulsarMessageListener;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.DeadLetterPolicy;
 import org.apache.pulsar.client.api.DeadLetterPolicy.DeadLetterPolicyBuilder;
@@ -66,6 +67,15 @@ public final class CommonCreationStrategyImpl {
 
             builder.deadLetterPolicy(policy.build());
         }
+
+        if 
(ObjectHelper.isNotEmpty(endpointConfiguration.getAckTimeoutRedeliveryBackoff()))
 {
+            
builder.ackTimeoutRedeliveryBackoff(endpointConfiguration.getAckTimeoutRedeliveryBackoff());
+        }
+
+        if 
(ObjectHelper.isNotEmpty(endpointConfiguration.getNegativeAckRedeliveryBackoff()))
 {
+            
builder.negativeAckRedeliveryBackoff(endpointConfiguration.getNegativeAckRedeliveryBackoff());
+        }
+
         return builder;
     }
 }
diff --git 
a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerDeadLetterPolicyIT.java
 
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerDeadLetterPolicyIT.java
index 2d45cb44503..8b2223a5796 100644
--- 
a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerDeadLetterPolicyIT.java
+++ 
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerDeadLetterPolicyIT.java
@@ -111,7 +111,7 @@ public class PulsarConsumerDeadLetterPolicyIT extends 
PulsarITSupport {
     }
 
     @Test
-    public void 
givenMaxRedeliverCountverifyMessageGetsSentToDefaultDeadLetterTopicAfterCountExceeded()
+    public void 
givenMaxRedeliverCountVerifyMessageGetsSentToDefaultDeadLetterTopicAfterCountExceeded()
             throws Exception {
         PulsarComponent component = context.getComponent("pulsar", 
PulsarComponent.class);
 
@@ -136,7 +136,7 @@ public class PulsarConsumerDeadLetterPolicyIT extends 
PulsarITSupport {
     }
 
     @Test
-    public void 
givenMaxRedeliverCountAndDeadLetterTopicverifyMessageGetsSentToSpecifiedDeadLetterTopicAfterCountExceeded()
+    public void 
givenMaxRedeliverCountAndDeadLetterTopicVerifyMessageGetsSentToSpecifiedDeadLetterTopicAfterCountExceeded()
             throws Exception {
         PulsarComponent component = context.getComponent("pulsar", 
PulsarComponent.class);
 
@@ -162,7 +162,7 @@ public class PulsarConsumerDeadLetterPolicyIT extends 
PulsarITSupport {
     }
 
     @Test
-    public void 
givenOnlyDeadLetterTopicverifyMessageDoesNotGetSentToSpecifiedTopic() throws 
Exception {
+    public void 
givenOnlyDeadLetterTopicVerifyMessageDoesNotGetSentToSpecifiedTopic() throws 
Exception {
         PulsarComponent component = context.getComponent("pulsar", 
PulsarComponent.class);
 
         PulsarEndpoint from = (PulsarEndpoint) component
diff --git 
a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerNoAcknowledgementIT.java
 
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerNegativeAcknowledgementIT.java
similarity index 67%
copy from 
components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerNoAcknowledgementIT.java
copy to 
components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerNegativeAcknowledgementIT.java
index 026e0d2d41e..f5bb919fe0b 100644
--- 
a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerNoAcknowledgementIT.java
+++ 
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerNegativeAcknowledgementIT.java
@@ -23,7 +23,9 @@ import org.apache.camel.EndpointInject;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.component.pulsar.PulsarComponent;
+import org.apache.camel.component.pulsar.PulsarMessageReceipt;
 import org.apache.camel.component.pulsar.utils.AutoConfiguration;
+import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders;
 import org.apache.camel.spi.Registry;
 import org.apache.camel.support.SimpleRegistry;
 import org.apache.pulsar.client.api.Producer;
@@ -31,16 +33,16 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.MultiplierRedeliveryBackoff;
 import org.junit.jupiter.api.Test;
 
-public class PulsarConsumerNoAcknowledgementIT extends PulsarITSupport {
+public class PulsarConsumerNegativeAcknowledgementIT extends PulsarITSupport {
 
-    private static final String TOPIC_URI = 
"persistent://public/default/camel-topic";
+    private static final String TOPIC_URI = 
"persistent://public/default/camel-topic-negative-ack";
     private static final String PRODUCER = "camel-producer-1";
 
-    @EndpointInject("pulsar:" + TOPIC_URI + 
"?numberOfConsumers=1&subscriptionType=Exclusive"
-                    + 
"&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
-                    + "&ackTimeoutMillis=1000")
+    @EndpointInject("pulsar:" + TOPIC_URI + 
"?numberOfConsumers=1&subscriptionType=Exclusive&batchingEnabled=false"
+                    + 
"&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer")
     private Endpoint from;
 
     @EndpointInject("mock:result")
@@ -51,8 +53,12 @@ public class PulsarConsumerNoAcknowledgementIT extends 
PulsarITSupport {
         return new RouteBuilder() {
             @Override
             public void configure() {
-                // Nothing in the route will ack the message.
-                from(from).to(to);
+                // This route will explicitly negative acknowledge the message.
+                from(from)
+                        .process(exchange -> exchange.getIn()
+                                
.getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT, PulsarMessageReceipt.class)
+                                .negativeAcknowledge())
+                        .to(to);
             }
         };
     }
@@ -74,7 +80,16 @@ public class PulsarConsumerNoAcknowledgementIT extends 
PulsarITSupport {
         PulsarComponent comp = new PulsarComponent(context);
         comp.setAutoConfiguration(autoConfiguration);
         comp.setPulsarClient(pulsarClient);
-        comp.getConfiguration().setAllowManualAcknowledgement(true); // Set to 
true here instead of the endpoint query parameter.
+        comp.getConfiguration()
+                .setAllowManualAcknowledgement(true); // Set to true here 
instead of the endpoint query parameter.
+        comp.getConfiguration().setAckTimeoutMillis(60_000L);
+        // Given relevant millis=1000 redeliveries will occur at 1s + 0.01s, 
1s + 1s, 1s + 100s, 1s + 100s, 1s + 100s...
+        
comp.getConfiguration().setNegativeAckRedeliveryDelayMicros(1_000_000L);
+        
comp.getConfiguration().setNegativeAckRedeliveryBackoff(MultiplierRedeliveryBackoff.builder()
+                .minDelayMs(10L)
+                .maxDelayMs(100_000L)
+                .multiplier(100.0)
+                .build());
         registry.bind("pulsar", comp);
     }
 
@@ -83,8 +98,8 @@ public class PulsarConsumerNoAcknowledgementIT extends 
PulsarITSupport {
     }
 
     @Test
-    public void testAMessageIsConsumedMultipleTimes() throws Exception {
-        to.expectedMinimumMessageCount(2);
+    public void testAMessageIsConsumedMultipleTimesWithNegativeAckBackoff() 
throws Exception {
+        to.expectedMessageCount(3);
 
         Producer<String> producer
                 = 
givenPulsarClient().newProducer(Schema.STRING).producerName(PRODUCER).topic(TOPIC_URI).create();
@@ -92,6 +107,7 @@ public class PulsarConsumerNoAcknowledgementIT extends 
PulsarITSupport {
         producer.send("Hello World!");
 
         MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
-    }
 
+        producer.close();
+    }
 }
diff --git 
a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerNoAcknowledgementIT.java
 
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerNoAcknowledgementIT.java
index 026e0d2d41e..aed3a199f77 100644
--- 
a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerNoAcknowledgementIT.java
+++ 
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerNoAcknowledgementIT.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.MultiplierRedeliveryBackoff;
 import org.junit.jupiter.api.Test;
 
 public class PulsarConsumerNoAcknowledgementIT extends PulsarITSupport {
@@ -39,8 +40,7 @@ public class PulsarConsumerNoAcknowledgementIT extends 
PulsarITSupport {
     private static final String PRODUCER = "camel-producer-1";
 
     @EndpointInject("pulsar:" + TOPIC_URI + 
"?numberOfConsumers=1&subscriptionType=Exclusive"
-                    + 
"&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
-                    + "&ackTimeoutMillis=1000")
+                    + 
"&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer")
     private Endpoint from;
 
     @EndpointInject("mock:result")
@@ -74,7 +74,15 @@ public class PulsarConsumerNoAcknowledgementIT extends 
PulsarITSupport {
         PulsarComponent comp = new PulsarComponent(context);
         comp.setAutoConfiguration(autoConfiguration);
         comp.setPulsarClient(pulsarClient);
-        comp.getConfiguration().setAllowManualAcknowledgement(true); // Set to 
true here instead of the endpoint query parameter.
+        comp.getConfiguration()
+                .setAllowManualAcknowledgement(true); // Set to true here 
instead of the endpoint query parameter.
+        // Given relevant millis=1000 redeliveries will occur at 1s + 0.01s, 
1s + 1s, 1s + 100s, 1s + 100s, 1s + 100s...
+        comp.getConfiguration().setAckTimeoutMillis(1_000L);
+        
comp.getConfiguration().setAckTimeoutRedeliveryBackoff(MultiplierRedeliveryBackoff.builder()
+                .minDelayMs(10L)
+                .maxDelayMs(100_000L)
+                .multiplier(100.0)
+                .build());
         registry.bind("pulsar", comp);
     }
 
@@ -92,6 +100,21 @@ public class PulsarConsumerNoAcknowledgementIT extends 
PulsarITSupport {
         producer.send("Hello World!");
 
         MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
+
+        producer.close();
     }
 
+    @Test
+    public void testAMessageIsConsumedMultipleTimesWithAckTimeoutBackoff() 
throws Exception {
+        to.expectedMessageCount(3);
+
+        Producer<String> producer
+                = 
givenPulsarClient().newProducer(Schema.STRING).producerName(PRODUCER).topic(TOPIC_URI).create();
+
+        producer.send("Hello World!");
+
+        MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
+
+        producer.close();
+    }
 }

Reply via email to