This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new c736c07 CAMEL-15772: camel-pulsar readCompacted support (#4535)
c736c07 is described below
commit c736c07b9f82e93f8f3ca1edb9be5ba6de5bfc24
Author: ludovic-boutros <[email protected]>
AuthorDate: Fri Oct 30 05:56:29 2020 +0100
CAMEL-15772: camel-pulsar readCompacted support (#4535)
---
.../apache/camel/catalog/components/pulsar.json | 2 +
.../camel/catalog/docs/pulsar-component.adoc | 6 +-
.../pulsar/PulsarComponentConfigurer.java | 5 +
.../component/pulsar/PulsarEndpointConfigurer.java | 5 +
.../component/pulsar/PulsarEndpointUriFactory.java | 3 +-
.../org/apache/camel/component/pulsar/pulsar.json | 2 +
.../src/main/docs/pulsar-component.adoc | 6 +-
.../component/pulsar/PulsarConfiguration.java | 13 ++
.../consumers/CommonCreationStrategyImpl.java | 3 +-
.../component/pulsar/PulsarComponentTest.java | 1 +
.../pulsar/PulsarConsumerReadCompactedTest.java | 155 +++++++++++++++++++++
.../dsl/PulsarComponentBuilderFactory.java | 13 ++
.../endpoint/dsl/PulsarEndpointBuilderFactory.java | 25 ++++
.../modules/ROOT/pages/pulsar-component.adoc | 6 +-
14 files changed, 237 insertions(+), 8 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/pulsar.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/pulsar.json
index ffcc9c5..879c934 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/pulsar.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/pulsar.json
@@ -34,6 +34,7 @@
"maxRedeliverCount": { "kind": "property", "displayName": "Max Redeliver
Count", "group": "consumer", "label": "consumer", "required": false, "type":
"integer", "javaType": "java.lang.Integer", "deprecated": false, "secret":
false, "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"configuration", "description": "Maximum number of times that a message will be
redelivered before being sent to the dead letter queue. If this value is no
[...]
"negativeAckRedeliveryDelayMicros": { "kind": "property", "displayName":
"Negative Ack Redelivery Delay Micros", "group": "consumer", "label":
"consumer", "required": false, "type": "integer", "javaType": "long",
"deprecated": false, "secret": false, "defaultValue": 60000000,
"configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration",
"configurationField": "configuration", "description": "Set the negative
acknowledgement delay" },
"numberOfConsumers": { "kind": "property", "displayName": "Number Of
Consumers", "group": "consumer", "label": "consumer", "required": false,
"type": "integer", "javaType": "int", "deprecated": false, "secret": false,
"defaultValue": 1, "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"configuration", "description": "Number of consumers - defaults to 1" },
+ "readCompacted": { "kind": "property", "displayName": "Read Compacted",
"group": "consumer", "label": "consumer", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "secret": false, "defaultValue":
false, "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"configuration", "description": "Enable compacted topic reading." },
"subscriptionInitialPosition": { "kind": "property", "displayName":
"Subscription Initial Position", "group": "consumer", "label": "consumer",
"required": false, "type": "object", "javaType":
"org.apache.camel.component.pulsar.utils.consumers.SubscriptionInitialPosition",
"enum": [ "EARLIEST", "LATEST" ], "deprecated": false, "secret": false,
"defaultValue": "LATEST", "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"configuration", [...]
"subscriptionName": { "kind": "property", "displayName": "Subscription
Name", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "secret": false,
"defaultValue": "subs", "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"configuration", "description": "Name of the subscription to use" },
"subscriptionTopicsMode": { "kind": "property", "displayName":
"Subscription Topics Mode", "group": "consumer", "label": "consumer",
"required": false, "type": "object", "javaType":
"org.apache.pulsar.client.api.RegexSubscriptionMode", "enum": [
"PersistentOnly", "NonPersistentOnly", "AllTopics" ], "deprecated": false,
"secret": false, "defaultValue": "PersistentOnly", "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"configuration" [...]
@@ -74,6 +75,7 @@
"maxRedeliverCount": { "kind": "parameter", "displayName": "Max Redeliver
Count", "group": "consumer", "label": "consumer", "required": false, "type":
"integer", "javaType": "java.lang.Integer", "deprecated": false, "secret":
false, "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"pulsarConfiguration", "description": "Maximum number of times that a message
will be redelivered before being sent to the dead letter queue. If this valu
[...]
"negativeAckRedeliveryDelayMicros": { "kind": "parameter", "displayName":
"Negative Ack Redelivery Delay Micros", "group": "consumer", "label":
"consumer", "required": false, "type": "integer", "javaType": "long",
"deprecated": false, "secret": false, "defaultValue": 60000000,
"configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration",
"configurationField": "pulsarConfiguration", "description": "Set the negative
acknowledgement delay" },
"numberOfConsumers": { "kind": "parameter", "displayName": "Number Of
Consumers", "group": "consumer", "label": "consumer", "required": false,
"type": "integer", "javaType": "int", "deprecated": false, "secret": false,
"defaultValue": 1, "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"pulsarConfiguration", "description": "Number of consumers - defaults to 1" },
+ "readCompacted": { "kind": "parameter", "displayName": "Read Compacted",
"group": "consumer", "label": "consumer", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "secret": false, "defaultValue":
false, "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"pulsarConfiguration", "description": "Enable compacted topic reading." },
"subscriptionInitialPosition": { "kind": "parameter", "displayName":
"Subscription Initial Position", "group": "consumer", "label": "consumer",
"required": false, "type": "object", "javaType":
"org.apache.camel.component.pulsar.utils.consumers.SubscriptionInitialPosition",
"enum": [ "EARLIEST", "LATEST" ], "deprecated": false, "secret": false,
"defaultValue": "LATEST", "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"pulsarConfigur [...]
"subscriptionName": { "kind": "parameter", "displayName": "Subscription
Name", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "secret": false,
"defaultValue": "subs", "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"pulsarConfiguration", "description": "Name of the subscription to use" },
"subscriptionTopicsMode": { "kind": "parameter", "displayName":
"Subscription Topics Mode", "group": "consumer", "label": "consumer",
"required": false, "type": "object", "javaType":
"org.apache.pulsar.client.api.RegexSubscriptionMode", "enum": [
"PersistentOnly", "NonPersistentOnly", "AllTopics" ], "deprecated": false,
"secret": false, "defaultValue": "PersistentOnly", "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"pulsarConfigu [...]
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/pulsar-component.adoc
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/pulsar-component.adoc
index db3962b..cb375e6 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/pulsar-component.adoc
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/pulsar-component.adoc
@@ -36,7 +36,7 @@ pulsar:[persistent|non-persistent]://tenant/namespace/topic
// component options: START
-The Pulsar component supports 35 options, which are listed below.
+The Pulsar component supports 36 options, which are listed below.
@@ -55,6 +55,7 @@ The Pulsar component supports 35 options, which are listed
below.
| *maxRedeliverCount* (consumer) | Maximum number of times that a message will
be redelivered before being sent to the dead letter queue. If this value is not
set, no Dead Letter Policy will be created | | Integer
| *negativeAckRedeliveryDelay{zwsp}Micros* (consumer) | Set the negative
acknowledgement delay | 60000000 | long
| *numberOfConsumers* (consumer) | Number of consumers - defaults to 1 | 1 |
int
+| *readCompacted* (consumer) | Enable compacted topic reading. | false |
boolean
| *subscriptionInitialPosition* (consumer) | Control the initial position in
the topic of a newly created subscription. Default is latest message. There are
2 enums and the value can be one of: EARLIEST, LATEST | LATEST |
SubscriptionInitialPosition
| *subscriptionName* (consumer) | Name of the subscription to use | subs |
String
| *subscriptionTopicsMode* (consumer) | Determines to which topics this
consumer should be subscribed to - Persistent, Non-Persistent, or both. Only
used with pattern subscriptions. There are 3 enums and the value can be one of:
PersistentOnly, NonPersistentOnly, AllTopics | PersistentOnly |
RegexSubscriptionMode
@@ -107,7 +108,7 @@ with the following path and query parameters:
|===
-=== Query Parameters (34 parameters):
+=== Query Parameters (35 parameters):
[width="100%",cols="2,5,^1,2",options="header"]
@@ -124,6 +125,7 @@ with the following path and query parameters:
| *maxRedeliverCount* (consumer) | Maximum number of times that a message will
be redelivered before being sent to the dead letter queue. If this value is not
set, no Dead Letter Policy will be created | | Integer
| *negativeAckRedeliveryDelay{zwsp}Micros* (consumer) | Set the negative
acknowledgement delay | 60000000 | long
| *numberOfConsumers* (consumer) | Number of consumers - defaults to 1 | 1 |
int
+| *readCompacted* (consumer) | Enable compacted topic reading. | false |
boolean
| *subscriptionInitialPosition* (consumer) | Control the initial position in
the topic of a newly created subscription. Default is latest message. There are
2 enums and the value can be one of: EARLIEST, LATEST | LATEST |
SubscriptionInitialPosition
| *subscriptionName* (consumer) | Name of the subscription to use | subs |
String
| *subscriptionTopicsMode* (consumer) | Determines to which topics this
consumer should be subscribed to - Persistent, Non-Persistent, or both. Only
used with pattern subscriptions. There are 3 enums and the value can be one of:
PersistentOnly, NonPersistentOnly, AllTopics | PersistentOnly |
RegexSubscriptionMode
diff --git
a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarComponentConfigurer.java
b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarComponentConfigurer.java
index 4480dc7..bc6689f 100644
---
a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarComponentConfigurer.java
+++
b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarComponentConfigurer.java
@@ -30,6 +30,7 @@ public class PulsarComponentConfigurer extends
PropertyConfigurerSupport impleme
map.put("maxRedeliverCount", java.lang.Integer.class);
map.put("negativeAckRedeliveryDelayMicros", long.class);
map.put("numberOfConsumers", int.class);
+ map.put("readCompacted", boolean.class);
map.put("subscriptionInitialPosition",
org.apache.camel.component.pulsar.utils.consumers.SubscriptionInitialPosition.class);
map.put("subscriptionName", java.lang.String.class);
map.put("subscriptionTopicsMode",
org.apache.pulsar.client.api.RegexSubscriptionMode.class);
@@ -124,6 +125,8 @@ public class PulsarComponentConfigurer extends
PropertyConfigurerSupport impleme
case "pulsarClient": target.setPulsarClient(property(camelContext,
org.apache.pulsar.client.api.PulsarClient.class, value)); return true;
case "pulsarmessagereceiptfactory":
case "pulsarMessageReceiptFactory":
target.setPulsarMessageReceiptFactory(property(camelContext,
org.apache.camel.component.pulsar.PulsarMessageReceiptFactory.class, value));
return true;
+ case "readcompacted":
+ case "readCompacted":
getOrCreateConfiguration(target).setReadCompacted(property(camelContext,
boolean.class, value)); return true;
case "sendtimeoutms":
case "sendTimeoutMs":
getOrCreateConfiguration(target).setSendTimeoutMs(property(camelContext,
int.class, value)); return true;
case "subscriptioninitialposition":
@@ -206,6 +209,8 @@ public class PulsarComponentConfigurer extends
PropertyConfigurerSupport impleme
case "pulsarClient": return target.getPulsarClient();
case "pulsarmessagereceiptfactory":
case "pulsarMessageReceiptFactory": return
target.getPulsarMessageReceiptFactory();
+ case "readcompacted":
+ case "readCompacted": return
getOrCreateConfiguration(target).isReadCompacted();
case "sendtimeoutms":
case "sendTimeoutMs": return
getOrCreateConfiguration(target).getSendTimeoutMs();
case "subscriptioninitialposition":
diff --git
a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
index 69bd2e8..be19279 100644
---
a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
+++
b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
@@ -33,6 +33,7 @@ public class PulsarEndpointConfigurer extends
PropertyConfigurerSupport implemen
map.put("maxRedeliverCount", java.lang.Integer.class);
map.put("negativeAckRedeliveryDelayMicros", long.class);
map.put("numberOfConsumers", int.class);
+ map.put("readCompacted", boolean.class);
map.put("subscriptionInitialPosition",
org.apache.camel.component.pulsar.utils.consumers.SubscriptionInitialPosition.class);
map.put("subscriptionName", java.lang.String.class);
map.put("subscriptionTopicsMode",
org.apache.pulsar.client.api.RegexSubscriptionMode.class);
@@ -117,6 +118,8 @@ public class PulsarEndpointConfigurer extends
PropertyConfigurerSupport implemen
case "numberOfConsumers":
target.getPulsarConfiguration().setNumberOfConsumers(property(camelContext,
int.class, value)); return true;
case "producername":
case "producerName":
target.getPulsarConfiguration().setProducerName(property(camelContext,
java.lang.String.class, value)); return true;
+ case "readcompacted":
+ case "readCompacted":
target.getPulsarConfiguration().setReadCompacted(property(camelContext,
boolean.class, value)); return true;
case "sendtimeoutms":
case "sendTimeoutMs":
target.getPulsarConfiguration().setSendTimeoutMs(property(camelContext,
int.class, value)); return true;
case "subscriptioninitialposition":
@@ -197,6 +200,8 @@ public class PulsarEndpointConfigurer extends
PropertyConfigurerSupport implemen
case "numberOfConsumers": return
target.getPulsarConfiguration().getNumberOfConsumers();
case "producername":
case "producerName": return
target.getPulsarConfiguration().getProducerName();
+ case "readcompacted":
+ case "readCompacted": return
target.getPulsarConfiguration().isReadCompacted();
case "sendtimeoutms":
case "sendTimeoutMs": return
target.getPulsarConfiguration().getSendTimeoutMs();
case "subscriptioninitialposition":
diff --git
a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointUriFactory.java
b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointUriFactory.java
index 5e70ee5..34ac4a1 100644
---
a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointUriFactory.java
+++
b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointUriFactory.java
@@ -20,7 +20,7 @@ public class PulsarEndpointUriFactory extends
org.apache.camel.support.component
private static final Set<String> PROPERTY_NAMES;
private static final Set<String> SECRET_PROPERTY_NAMES;
static {
- Set<String> props = new HashSet<>(38);
+ Set<String> props = new HashSet<>(39);
props.add("basicPropertyBinding");
props.add("initialSequenceId");
props.add("maxRedeliverCount");
@@ -49,6 +49,7 @@ public class PulsarEndpointUriFactory extends
org.apache.camel.support.component
props.add("messageRoutingMode");
props.add("ackTimeoutMillis");
props.add("consumerNamePrefix");
+ props.add("readCompacted");
props.add("lazyStartProducer");
props.add("subscriptionType");
props.add("subscriptionName");
diff --git
a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
index ffcc9c5..879c934 100644
---
a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
+++
b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
@@ -34,6 +34,7 @@
"maxRedeliverCount": { "kind": "property", "displayName": "Max Redeliver
Count", "group": "consumer", "label": "consumer", "required": false, "type":
"integer", "javaType": "java.lang.Integer", "deprecated": false, "secret":
false, "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"configuration", "description": "Maximum number of times that a message will be
redelivered before being sent to the dead letter queue. If this value is no
[...]
"negativeAckRedeliveryDelayMicros": { "kind": "property", "displayName":
"Negative Ack Redelivery Delay Micros", "group": "consumer", "label":
"consumer", "required": false, "type": "integer", "javaType": "long",
"deprecated": false, "secret": false, "defaultValue": 60000000,
"configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration",
"configurationField": "configuration", "description": "Set the negative
acknowledgement delay" },
"numberOfConsumers": { "kind": "property", "displayName": "Number Of
Consumers", "group": "consumer", "label": "consumer", "required": false,
"type": "integer", "javaType": "int", "deprecated": false, "secret": false,
"defaultValue": 1, "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"configuration", "description": "Number of consumers - defaults to 1" },
+ "readCompacted": { "kind": "property", "displayName": "Read Compacted",
"group": "consumer", "label": "consumer", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "secret": false, "defaultValue":
false, "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"configuration", "description": "Enable compacted topic reading." },
"subscriptionInitialPosition": { "kind": "property", "displayName":
"Subscription Initial Position", "group": "consumer", "label": "consumer",
"required": false, "type": "object", "javaType":
"org.apache.camel.component.pulsar.utils.consumers.SubscriptionInitialPosition",
"enum": [ "EARLIEST", "LATEST" ], "deprecated": false, "secret": false,
"defaultValue": "LATEST", "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"configuration", [...]
"subscriptionName": { "kind": "property", "displayName": "Subscription
Name", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "secret": false,
"defaultValue": "subs", "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"configuration", "description": "Name of the subscription to use" },
"subscriptionTopicsMode": { "kind": "property", "displayName":
"Subscription Topics Mode", "group": "consumer", "label": "consumer",
"required": false, "type": "object", "javaType":
"org.apache.pulsar.client.api.RegexSubscriptionMode", "enum": [
"PersistentOnly", "NonPersistentOnly", "AllTopics" ], "deprecated": false,
"secret": false, "defaultValue": "PersistentOnly", "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"configuration" [...]
@@ -74,6 +75,7 @@
"maxRedeliverCount": { "kind": "parameter", "displayName": "Max Redeliver
Count", "group": "consumer", "label": "consumer", "required": false, "type":
"integer", "javaType": "java.lang.Integer", "deprecated": false, "secret":
false, "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"pulsarConfiguration", "description": "Maximum number of times that a message
will be redelivered before being sent to the dead letter queue. If this valu
[...]
"negativeAckRedeliveryDelayMicros": { "kind": "parameter", "displayName":
"Negative Ack Redelivery Delay Micros", "group": "consumer", "label":
"consumer", "required": false, "type": "integer", "javaType": "long",
"deprecated": false, "secret": false, "defaultValue": 60000000,
"configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration",
"configurationField": "pulsarConfiguration", "description": "Set the negative
acknowledgement delay" },
"numberOfConsumers": { "kind": "parameter", "displayName": "Number Of
Consumers", "group": "consumer", "label": "consumer", "required": false,
"type": "integer", "javaType": "int", "deprecated": false, "secret": false,
"defaultValue": 1, "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"pulsarConfiguration", "description": "Number of consumers - defaults to 1" },
+ "readCompacted": { "kind": "parameter", "displayName": "Read Compacted",
"group": "consumer", "label": "consumer", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "secret": false, "defaultValue":
false, "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"pulsarConfiguration", "description": "Enable compacted topic reading." },
"subscriptionInitialPosition": { "kind": "parameter", "displayName":
"Subscription Initial Position", "group": "consumer", "label": "consumer",
"required": false, "type": "object", "javaType":
"org.apache.camel.component.pulsar.utils.consumers.SubscriptionInitialPosition",
"enum": [ "EARLIEST", "LATEST" ], "deprecated": false, "secret": false,
"defaultValue": "LATEST", "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"pulsarConfigur [...]
"subscriptionName": { "kind": "parameter", "displayName": "Subscription
Name", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "secret": false,
"defaultValue": "subs", "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"pulsarConfiguration", "description": "Name of the subscription to use" },
"subscriptionTopicsMode": { "kind": "parameter", "displayName":
"Subscription Topics Mode", "group": "consumer", "label": "consumer",
"required": false, "type": "object", "javaType":
"org.apache.pulsar.client.api.RegexSubscriptionMode", "enum": [
"PersistentOnly", "NonPersistentOnly", "AllTopics" ], "deprecated": false,
"secret": false, "defaultValue": "PersistentOnly", "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"pulsarConfigu [...]
diff --git a/components/camel-pulsar/src/main/docs/pulsar-component.adoc
b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
index db3962b..cb375e6 100644
--- a/components/camel-pulsar/src/main/docs/pulsar-component.adoc
+++ b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
@@ -36,7 +36,7 @@ pulsar:[persistent|non-persistent]://tenant/namespace/topic
// component options: START
-The Pulsar component supports 35 options, which are listed below.
+The Pulsar component supports 36 options, which are listed below.
@@ -55,6 +55,7 @@ The Pulsar component supports 35 options, which are listed
below.
| *maxRedeliverCount* (consumer) | Maximum number of times that a message will
be redelivered before being sent to the dead letter queue. If this value is not
set, no Dead Letter Policy will be created | | Integer
| *negativeAckRedeliveryDelay{zwsp}Micros* (consumer) | Set the negative
acknowledgement delay | 60000000 | long
| *numberOfConsumers* (consumer) | Number of consumers - defaults to 1 | 1 |
int
+| *readCompacted* (consumer) | Enable compacted topic reading. | false |
boolean
| *subscriptionInitialPosition* (consumer) | Control the initial position in
the topic of a newly created subscription. Default is latest message. There are
2 enums and the value can be one of: EARLIEST, LATEST | LATEST |
SubscriptionInitialPosition
| *subscriptionName* (consumer) | Name of the subscription to use | subs |
String
| *subscriptionTopicsMode* (consumer) | Determines to which topics this
consumer should be subscribed to - Persistent, Non-Persistent, or both. Only
used with pattern subscriptions. There are 3 enums and the value can be one of:
PersistentOnly, NonPersistentOnly, AllTopics | PersistentOnly |
RegexSubscriptionMode
@@ -107,7 +108,7 @@ with the following path and query parameters:
|===
-=== Query Parameters (34 parameters):
+=== Query Parameters (35 parameters):
[width="100%",cols="2,5,^1,2",options="header"]
@@ -124,6 +125,7 @@ with the following path and query parameters:
| *maxRedeliverCount* (consumer) | Maximum number of times that a message will
be redelivered before being sent to the dead letter queue. If this value is not
set, no Dead Letter Policy will be created | | Integer
| *negativeAckRedeliveryDelay{zwsp}Micros* (consumer) | Set the negative
acknowledgement delay | 60000000 | long
| *numberOfConsumers* (consumer) | Number of consumers - defaults to 1 | 1 |
int
+| *readCompacted* (consumer) | Enable compacted topic reading. | false |
boolean
| *subscriptionInitialPosition* (consumer) | Control the initial position in
the topic of a newly created subscription. Default is latest message. There are
2 enums and the value can be one of: EARLIEST, LATEST | LATEST |
SubscriptionInitialPosition
| *subscriptionName* (consumer) | Name of the subscription to use | subs |
String
| *subscriptionTopicsMode* (consumer) | Determines to which topics this
consumer should be subscribed to - Persistent, Non-Persistent, or both. Only
used with pattern subscriptions. There are 3 enums and the value can be one of:
PersistentOnly, NonPersistentOnly, AllTopics | PersistentOnly |
RegexSubscriptionMode
diff --git
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
index a10c1a1..c43b511 100644
---
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
+++
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
@@ -63,6 +63,8 @@ public class PulsarConfiguration implements Cloneable {
private long ackGroupTimeMillis = 100;
@UriParam(label = "consumer", defaultValue = "LATEST")
private SubscriptionInitialPosition subscriptionInitialPosition = LATEST;
+ @UriParam(label = "consumer", defaultValue = "false")
+ private boolean readCompacted;
@UriParam(label = "consumer",
description = "Maximum number of times that a message will be
redelivered before being sent to the dead letter queue. If this value is not
set, no Dead Letter Policy will be created")
private Integer maxRedeliverCount;
@@ -367,6 +369,17 @@ public class PulsarConfiguration implements Cloneable {
}
/**
+ * Enable compacted topic reading.
+ */
+ public boolean isReadCompacted() {
+ return readCompacted;
+ }
+
+ public void setReadCompacted(boolean readCompacted) {
+ this.readCompacted = readCompacted;
+ }
+
+ /**
* Set the baseline for the sequence ids for messages published by the
producer. First message will be using
* (initialSequenceId 1) as its sequence id and subsequent messages will
be assigned incremental sequence ids, if
* not otherwise specified.
diff --git
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
index f429980..86fe75c 100644
---
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
+++
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
@@ -51,7 +51,8 @@ public final class CommonCreationStrategyImpl {
endpointConfiguration.getSubscriptionInitialPosition().toPulsarSubscriptionInitialPosition())
.acknowledgmentGroupTime(endpointConfiguration.getAckGroupTimeMillis(),
TimeUnit.MILLISECONDS)
.negativeAckRedeliveryDelay(endpointConfiguration.getNegativeAckRedeliveryDelayMicros(),
TimeUnit.MICROSECONDS)
- .messageListener(new PulsarMessageListener(pulsarEndpoint,
pulsarConsumer));
+ .messageListener(new PulsarMessageListener(pulsarEndpoint,
pulsarConsumer))
+ .readCompacted(endpointConfiguration.isReadCompacted());
if (endpointConfiguration.getMaxRedeliverCount() != null) {
DeadLetterPolicyBuilder policy = DeadLetterPolicy.builder()
diff --git
a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarComponentTest.java
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarComponentTest.java
index 146586e..d86f981 100644
---
a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarComponentTest.java
+++
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarComponentTest.java
@@ -70,6 +70,7 @@ public class PulsarComponentTest extends CamelTestSupport {
assertEquals("subs",
endpoint.getPulsarConfiguration().getSubscriptionName());
assertEquals(SubscriptionType.EXCLUSIVE,
endpoint.getPulsarConfiguration().getSubscriptionType());
assertFalse(endpoint.getPulsarConfiguration().isAllowManualAcknowledgement());
+ assertFalse(endpoint.getPulsarConfiguration().isReadCompacted());
}
@Test
diff --git
a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerReadCompactedTest.java
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerReadCompactedTest.java
new file mode 100644
index 0000000..e80e64a
--- /dev/null
+++
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerReadCompactedTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.pulsar;
+
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.pulsar.utils.AutoConfiguration;
+import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders;
+import org.apache.camel.spi.Registry;
+import org.apache.camel.support.SimpleRegistry;
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.Topics;
+import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PulsarConsumerReadCompactedTest extends PulsarTestSupport {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PulsarConsumerReadCompactedTest.class);
+
+ private static final String TOPIC_URI =
"persistent://public/default/camel-topic";
+ private static final String PRODUCER = "camel-producer-1";
+
+ @EndpointInject("pulsar:" + TOPIC_URI +
"?numberOfConsumers=1&subscriptionType=Exclusive"
+ +
"&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
+ + "&allowManualAcknowledgement=true" +
"&ackTimeoutMillis=1000"
+ + "&readCompacted=true"
+ + "&negativeAckRedeliveryDelayMicros=100000")
+ private Endpoint from;
+
+ @EndpointInject("mock:result")
+ private MockEndpoint to;
+
+ private Producer<String> producer;
+
+ @BeforeEach
+ public void setup() throws Exception {
+ context.removeRoute("myRoute");
+ producer =
givenPulsarClient().newProducer(Schema.STRING).producerName(PRODUCER).topic(TOPIC_URI).create();
+ }
+
+ @Override
+ protected Registry createCamelRegistry() throws Exception {
+ Registry registry = new SimpleRegistry();
+
+ registerPulsarBeans(registry);
+
+ return registry;
+ }
+
+ private void registerPulsarBeans(final Registry registry) throws
PulsarClientException {
+ PulsarClient pulsarClient = givenPulsarClient();
+ AutoConfiguration autoConfiguration = new AutoConfiguration(null,
null);
+
+ registry.bind("pulsarClient", pulsarClient);
+ PulsarComponent comp = new PulsarComponent(context);
+ comp.setAutoConfiguration(autoConfiguration);
+ comp.setPulsarClient(pulsarClient);
+ registry.bind("pulsar", comp);
+ }
+
+ private PulsarClient givenPulsarClient() throws PulsarClientException {
+ return new
ClientBuilderImpl().serviceUrl(getPulsarBrokerUrl()).ioThreads(1).listenerThreads(1).build();
+ }
+
+ private PulsarAdmin givenPulsarAdmin() throws PulsarClientException {
+ return new
PulsarAdminBuilderImpl().serviceHttpUrl(getPulsarAdminUrl()).build();
+ }
+
+ private void triggerCompaction() throws PulsarAdminException,
PulsarClientException {
+ final Topics topics = givenPulsarAdmin().topics();
+
+ topics.triggerCompaction(TOPIC_URI);
+ while
(!topics.compactionStatus(TOPIC_URI).status.equals(LongRunningProcessStatus.Status.RUNNING))
{
+ LOGGER.info("Waiting for compaction completeness...");
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test
+ public void testReadCompacted() throws Exception {
+ to.expectedMessageCount(1);
+ triggerCompaction();
+
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() {
+ from(from).routeId("myRoute").to(to).process(exchange -> {
+ LOGGER.info("Processing message {}",
exchange.getIn().getBody());
+
+ PulsarMessageReceipt receipt
+ = (PulsarMessageReceipt)
exchange.getIn().getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
+ receipt.acknowledge();
+ });
+ }
+ });
+
+ producer.newMessage().key("myKey").value("Hello World!").send();
+ producer.newMessage().key("myKey").value("Hello World! Again!").send();
+
+ MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
+ }
+
+ @Test
+ public void testReadNotCompacted() throws Exception {
+ to.expectedMessageCount(2);
+ triggerCompaction();
+
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() {
+ from(from).routeId("myRoute").to(to).process(exchange -> {
+ LOGGER.info("Processing message {}",
exchange.getIn().getBody());
+
+ PulsarMessageReceipt receipt
+ = (PulsarMessageReceipt)
exchange.getIn().getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
+ receipt.acknowledge();
+ });
+ }
+ });
+
+ producer.newMessage().key("mySecondKey").value("Hello World!").send();
+ producer.newMessage().key("mySecondKey").value("Hello World!
Again!").send();
+
+ MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
+ }
+}
diff --git
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/PulsarComponentBuilderFactory.java
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/PulsarComponentBuilderFactory.java
index 02fc71e..de3ac84 100644
---
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/PulsarComponentBuilderFactory.java
+++
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/PulsarComponentBuilderFactory.java
@@ -220,6 +220,18 @@ public interface PulsarComponentBuilderFactory {
return this;
}
/**
+ * Enable compacted topic reading.
+ *
+ * The option is a: <code>boolean</code> type.
+ *
+ * Default: false
+ * Group: consumer
+ */
+ default PulsarComponentBuilder readCompacted(boolean readCompacted) {
+ doSetProperty("readCompacted", readCompacted);
+ return this;
+ }
+ /**
* Control the initial position in the topic of a newly created
* subscription. Default is latest message.
*
@@ -578,6 +590,7 @@ public interface PulsarComponentBuilderFactory {
case "maxRedeliverCount":
getOrCreateConfiguration((PulsarComponent)
component).setMaxRedeliverCount((java.lang.Integer) value); return true;
case "negativeAckRedeliveryDelayMicros":
getOrCreateConfiguration((PulsarComponent)
component).setNegativeAckRedeliveryDelayMicros((long) value); return true;
case "numberOfConsumers":
getOrCreateConfiguration((PulsarComponent)
component).setNumberOfConsumers((int) value); return true;
+ case "readCompacted": getOrCreateConfiguration((PulsarComponent)
component).setReadCompacted((boolean) value); return true;
case "subscriptionInitialPosition":
getOrCreateConfiguration((PulsarComponent)
component).setSubscriptionInitialPosition((org.apache.camel.component.pulsar.utils.consumers.SubscriptionInitialPosition)
value); return true;
case "subscriptionName":
getOrCreateConfiguration((PulsarComponent)
component).setSubscriptionName((java.lang.String) value); return true;
case "subscriptionTopicsMode":
getOrCreateConfiguration((PulsarComponent)
component).setSubscriptionTopicsMode((org.apache.pulsar.client.api.RegexSubscriptionMode)
value); return true;
diff --git
a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
index ce57097..fa79a01 100644
---
a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
+++
b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
@@ -319,6 +319,31 @@ public interface PulsarEndpointBuilderFactory {
return this;
}
/**
+ * Enable compacted topic reading.
+ *
+ * The option is a: <code>boolean</code> type.
+ *
+ * Default: false
+ * Group: consumer
+ */
+ default PulsarEndpointConsumerBuilder readCompacted(
+ boolean readCompacted) {
+ doSetProperty("readCompacted", readCompacted);
+ return this;
+ }
+ /**
+ * Enable compacted topic reading.
+ *
+ * The option will be converted to a <code>boolean</code> type.
+ *
+ * Default: false
+ * Group: consumer
+ */
+ default PulsarEndpointConsumerBuilder readCompacted(String
readCompacted) {
+ doSetProperty("readCompacted", readCompacted);
+ return this;
+ }
+ /**
* Control the initial position in the topic of a newly created
* subscription. Default is latest message.
*
diff --git a/docs/components/modules/ROOT/pages/pulsar-component.adoc
b/docs/components/modules/ROOT/pages/pulsar-component.adoc
index dd2c35b..0076543 100644
--- a/docs/components/modules/ROOT/pages/pulsar-component.adoc
+++ b/docs/components/modules/ROOT/pages/pulsar-component.adoc
@@ -38,7 +38,7 @@ pulsar:[persistent|non-persistent]://tenant/namespace/topic
// component options: START
-The Pulsar component supports 35 options, which are listed below.
+The Pulsar component supports 36 options, which are listed below.
@@ -57,6 +57,7 @@ The Pulsar component supports 35 options, which are listed
below.
| *maxRedeliverCount* (consumer) | Maximum number of times that a message will
be redelivered before being sent to the dead letter queue. If this value is not
set, no Dead Letter Policy will be created | | Integer
| *negativeAckRedeliveryDelay{zwsp}Micros* (consumer) | Set the negative
acknowledgement delay | 60000000 | long
| *numberOfConsumers* (consumer) | Number of consumers - defaults to 1 | 1 |
int
+| *readCompacted* (consumer) | Enable compacted topic reading. | false |
boolean
| *subscriptionInitialPosition* (consumer) | Control the initial position in
the topic of a newly created subscription. Default is latest message. There are
2 enums and the value can be one of: EARLIEST, LATEST | LATEST |
SubscriptionInitialPosition
| *subscriptionName* (consumer) | Name of the subscription to use | subs |
String
| *subscriptionTopicsMode* (consumer) | Determines to which topics this
consumer should be subscribed to - Persistent, Non-Persistent, or both. Only
used with pattern subscriptions. There are 3 enums and the value can be one of:
PersistentOnly, NonPersistentOnly, AllTopics | PersistentOnly |
RegexSubscriptionMode
@@ -109,7 +110,7 @@ with the following path and query parameters:
|===
-=== Query Parameters (34 parameters):
+=== Query Parameters (35 parameters):
[width="100%",cols="2,5,^1,2",options="header"]
@@ -126,6 +127,7 @@ with the following path and query parameters:
| *maxRedeliverCount* (consumer) | Maximum number of times that a message will
be redelivered before being sent to the dead letter queue. If this value is not
set, no Dead Letter Policy will be created | | Integer
| *negativeAckRedeliveryDelay{zwsp}Micros* (consumer) | Set the negative
acknowledgement delay | 60000000 | long
| *numberOfConsumers* (consumer) | Number of consumers - defaults to 1 | 1 |
int
+| *readCompacted* (consumer) | Enable compacted topic reading. | false |
boolean
| *subscriptionInitialPosition* (consumer) | Control the initial position in
the topic of a newly created subscription. Default is latest message. There are
2 enums and the value can be one of: EARLIEST, LATEST | LATEST |
SubscriptionInitialPosition
| *subscriptionName* (consumer) | Name of the subscription to use | subs |
String
| *subscriptionTopicsMode* (consumer) | Determines to which topics this
consumer should be subscribed to - Persistent, Non-Persistent, or both. Only
used with pattern subscriptions. There are 3 enums and the value can be one of:
PersistentOnly, NonPersistentOnly, AllTopics | PersistentOnly |
RegexSubscriptionMode