This is an automated email from the ASF dual-hosted git repository.
cbornet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git
The following commit(s) were added to refs/heads/main by this push:
new 297e882 Add javadoc to ReactiveMessageConsumerBuilder (#27)
297e882 is described below
commit 297e882d1a2599c5657a00c4836244739f8bf134
Author: Christophe Bornet <[email protected]>
AuthorDate: Mon Nov 28 10:29:23 2022 +0100
Add javadoc to ReactiveMessageConsumerBuilder (#27)
---
.../client/api/ReactiveMessageConsumerBuilder.java | 481 ++++++++++++++++++++-
1 file changed, 480 insertions(+), 1 deletion(-)
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilder.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilder.java
index e09179c..b86d5f9 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilder.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilder.java
@@ -20,34 +20,69 @@ import java.time.Duration;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
+import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.KeySharedPolicy;
+import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import reactor.core.scheduler.Scheduler;
+/**
+ * Builder interface for {@link ReactiveMessageConsumer}.
+ *
+ * @param <T> the message payload type
+ * @author Lari Hotari
+ * @author Christophe Bornet
+ */
public interface ReactiveMessageConsumerBuilder<T> {
+ /**
+ * Apply a consumer spec to configure the consumer.
+ * @param consumerSpec the consumer spec to apply
+ * @return the consumer builder instance
+ */
default ReactiveMessageConsumerBuilder<T>
applySpec(ReactiveMessageConsumerSpec consumerSpec) {
getMutableSpec().applySpec(consumerSpec);
return this;
}
+ /**
+ * Converts this builder to an immutable reactive consumer spec.
+ * @return the reactive consumer spec.
+ */
ReactiveMessageConsumerSpec toImmutableSpec();
+ /**
+ * Converts this builder to a mutable reactive consumer spec.
+ * @return the reactive consumer spec.
+ */
MutableReactiveMessageConsumerSpec getMutableSpec();
+ /**
+ * Adds a topic this consumer will subscribe on.
+ * @param topicName a topic that the consumer will subscribe on
+ * @return the consumer builder instance
+ * @see ConsumerBuilder#topic(String...)
+ */
default ReactiveMessageConsumerBuilder<T> topic(String topicName) {
getMutableSpec().getTopicNames().add(topicName);
return this;
}
+ /**
+ * Adds topics this consumer will subscribe on.
+ * @param topicNames a set of topic that the consumer will subscribe on
+ * @return the consumer builder instance
+ * @see ConsumerBuilder#topic(String...)
+ */
default ReactiveMessageConsumerBuilder<T> topic(String... topicNames) {
for (String topicName : topicNames) {
getMutableSpec().getTopicNames().add(topicName);
@@ -55,64 +90,189 @@ public interface ReactiveMessageConsumerBuilder<T> {
return this;
}
+ /**
+ * Sets the topics this consumer will subscribe on.
+ * @param topicNames a set of topic that the consumer will subscribe on
+ * @return the consumer builder instance
+ */
default ReactiveMessageConsumerBuilder<T> topicNames(List<String>
topicNames) {
getMutableSpec().setTopicNames(topicNames);
return this;
}
+ /**
+ * Sets a pattern for topics that this consumer will subscribe on.
+ *
+ * <p>
+ * The pattern will be applied to subscribe to all the topics, within a
single
+ * namespace, that will match the pattern.
+ *
+ * <p>
+ * The consumer will automatically subscribe to topics created after
itself.
+ * @param topicsPattern a regular expression to select a list of topics
to subscribe
+ * to
+ * @return the consumer builder instance
+ * @see ConsumerBuilder#topicsPattern(Pattern)
+ */
default ReactiveMessageConsumerBuilder<T> topicsPattern(Pattern
topicsPattern) {
getMutableSpec().setTopicsPattern(topicsPattern);
return this;
}
+ /**
+ * Sets to which topics this consumer should be subscribed to -
Persistent,
+ * Mon-Persistent, or both. Only used with pattern subscriptions.
+ * @param topicsPatternSubscriptionMode pattern subscription mode
+ * @return the consumer builder instance
+ * @see ConsumerBuilder#subscriptionTopicsMode(RegexSubscriptionMode)
+ */
default ReactiveMessageConsumerBuilder<T> topicsPatternSubscriptionMode(
RegexSubscriptionMode topicsPatternSubscriptionMode) {
getMutableSpec().setTopicsPatternSubscriptionMode(topicsPatternSubscriptionMode);
return this;
}
+ /**
+ * Sets the topics auto discovery period when using a pattern for
topics consumer.
+ * @param topicsPatternAutoDiscoveryPeriod duration between checks for
new topics
+ * matching pattern set with {@link #topicsPattern(Pattern)}
+ * @return the consumer builder instance
+ * @see ConsumerBuilder#patternAutoDiscoveryPeriod(int, TimeUnit)
+ */
default ReactiveMessageConsumerBuilder<T>
topicsPatternAutoDiscoveryPeriod(
Duration topicsPatternAutoDiscoveryPeriod) {
getMutableSpec().setTopicsPatternAutoDiscoveryPeriod(topicsPatternAutoDiscoveryPeriod);
return this;
}
+ /**
+ * Sets the subscription name for this consumer.
+ * <p>
+ * This argument is required when constructing the consumer.
+ * @param subscriptionName the name of the subscription that this
consumer should
+ * attach to
+ * @return the consumer builder instance
+ * @see ConsumerBuilder#subscriptionName(String)
+ */
default ReactiveMessageConsumerBuilder<T> subscriptionName(String
subscriptionName) {
getMutableSpec().setSubscriptionName(subscriptionName);
return this;
}
+ /**
+ * Sets the subscription mode to be used when subscribing to the topic.
+ *
+ * <p>
+ * Options are:
+ * <ul>
+ * <li>{@link SubscriptionMode#Durable} (Default)</li>
+ * <li>{@link SubscriptionMode#NonDurable}</li>
+ * </ul>
+ * @param subscriptionMode the subscription mode value
+ * @return the consumer builder instance
+ * @see ConsumerBuilder#subscriptionMode(SubscriptionMode)
+ */
default ReactiveMessageConsumerBuilder<T>
subscriptionMode(SubscriptionMode subscriptionMode) {
getMutableSpec().setSubscriptionMode(subscriptionMode);
return this;
}
+ /**
+ * Sets the subscription type to be used when subscribing to the topic.
+ *
+ * <p>
+ * Options are:
+ * <ul>
+ * <li>{@link SubscriptionType#Exclusive} (Default)</li>
+ * <li>{@link SubscriptionType#Failover}</li>
+ * <li>{@link SubscriptionType#Shared}</li>
+ * </ul>
+ * @param subscriptionType the subscription type value
+ * @return the consumer builder instance
+ * @see ConsumerBuilder#subscriptionType(SubscriptionType)
+ */
default ReactiveMessageConsumerBuilder<T>
subscriptionType(SubscriptionType subscriptionType) {
getMutableSpec().setSubscriptionType(subscriptionType);
return this;
}
+ /**
+ * Sets the initial position of the subscription for the consumer.
+ * @param subscriptionInitialPosition the position where to initialize
a newly created
+ * subscription
+ * @return the consumer builder instance
+ * @see
ConsumerBuilder#subscriptionInitialPosition(SubscriptionInitialPosition)
+ */
default ReactiveMessageConsumerBuilder<T> subscriptionInitialPosition(
SubscriptionInitialPosition
subscriptionInitialPosition) {
getMutableSpec().setSubscriptionInitialPosition(subscriptionInitialPosition);
return this;
}
+ /**
+ * Sets the KeyShared subscription policy for the consumer.
+ *
+ * <p>
+ * By default, KeyShared subscription uses auto split hash range to
maintain
+ * consumers. If you want to set a different KeyShared policy, you can
set it like
+ * this:
+ *
+ * <pre>
+ * client.messageConsumer(Schema.BYTES)
+ *
.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(0, 10)))
+ * .build();
+ * </pre> For details about sticky hash range policy, please see
+ * {@link KeySharedPolicy.KeySharedPolicySticky}.
+ *
+ * <p>
+ * Or <pre>
+ * client.messageConsumer(Schema.BYTES)
+ * .keySharedPolicy(KeySharedPolicy.autoSplitHashRange())
+ * .build();
+ * </pre> For details about auto split hash range policy, please see
+ * {@link KeySharedPolicy.KeySharedPolicyAutoSplit}.
+ * @param keySharedPolicy the KeyShared policy to set
+ * @return the consumer builder instance
+ * @see ConsumerBuilder#keySharedPolicy(KeySharedPolicy)
+ */
default ReactiveMessageConsumerBuilder<T>
keySharedPolicy(KeySharedPolicy keySharedPolicy) {
getMutableSpec().setKeySharedPolicy(keySharedPolicy);
return this;
}
+ /**
+ * Sets whether the subscription shall be replicated.
+ * @param replicateSubscriptionState whether the subscription shall be
replicated
+ * @return the consumer builder instance
+ * @see ConsumerBuilder#replicateSubscriptionState(boolean)
+ */
default ReactiveMessageConsumerBuilder<T>
replicateSubscriptionState(boolean replicateSubscriptionState) {
getMutableSpec().setReplicateSubscriptionState(replicateSubscriptionState);
return this;
}
+ /**
+ * Sets the subscription properties for this subscription. Properties
are immutable,
+ * and consumers under the same subscription will fail to create a
subscription if
+ * they use different properties.
+ * @param subscriptionProperties the subscription properties to set
+ * @return the consumer builder instance
+ * @see ConsumerBuilder#subscriptionProperties(Map)
+ */
default ReactiveMessageConsumerBuilder<T>
subscriptionProperties(Map<String, String> subscriptionProperties) {
getMutableSpec().setSubscriptionProperties(subscriptionProperties);
return this;
}
+ /**
+ * Adds a subscription property for this subscription. Properties are
immutable, and
+ * consumers under the same subscription will fail to create a
subscription if they
+ * use different properties.
+ * @param key the key of the property to add
+ * @param value the value of the property to add
+ * @return the consumer builder instance
+ * @see ConsumerBuilder#subscriptionProperties(Map)
+ */
default ReactiveMessageConsumerBuilder<T> subscriptionProperty(String
key, String value) {
if (getMutableSpec().getSubscriptionProperties() == null) {
getMutableSpec().setSubscriptionProperties(new
LinkedHashMap<>());
@@ -121,16 +281,49 @@ public interface ReactiveMessageConsumerBuilder<T> {
return this;
}
+ /**
+ * Sets the consumer name.
+ *
+ * <p>
+ * Consumer name is informative and it can be used to indentify a
particular consumer
+ * instance from the topic stats.
+ * @param consumerName the consumer name
+ * @return the consumer builder instance
+ * @see ConsumerBuilder#consumerName(String)
+ */
default ReactiveMessageConsumerBuilder<T> consumerName(String
consumerName) {
getMutableSpec().setConsumerName(consumerName);
return this;
}
+ /**
+ * Sets the properties for the consumer.
+ *
+ * <p>
+ * Properties are application defined metadata that can be attached to
the consumer.
+ * When getting the topic stats, this metadata will be associated to
the consumer
+ * stats for easier identification.
+ * @param properties the properties to set
+ * @return the consumer builder instance
+ * @see ConsumerBuilder#properties(Map)
+ */
default ReactiveMessageConsumerBuilder<T> properties(Map<String,
String> properties) {
getMutableSpec().setProperties(properties);
return this;
}
+ /**
+ * Add a property to the consumer.
+ *
+ * <p>
+ * Properties are application defined metadata that can be attached to
the consumer.
+ * When getting the topic stats, this metadata will be associated to
the consumer
+ * stats for easier identification.
+ * @param key the key of the property to add
+ * @param value the value of the property to add
+ * @return the consumer builder instance
+ * @see ConsumerBuilder#property(String, String)
+ */
default ReactiveMessageConsumerBuilder<T> property(String key, String
value) {
if (getMutableSpec().getProperties() == null) {
getMutableSpec().setProperties(new LinkedHashMap<>());
@@ -139,31 +332,147 @@ public interface ReactiveMessageConsumerBuilder<T> {
return this;
}
+ /**
+ * Sets the priority level for the consumer.
+ *
+ * <b>Shared subscription</b> Sets the priority level for the shared
subscription
+ * consumers to which the broker gives more priority while dispatching
messages. Here,
+ * the broker follows descending priorities. (eg: 0=max-priority, 1,
2,..)
+ *
+ * <p>
+ * In Shared subscription mode, the broker will first dispatch messages
to max
+ * priority-level consumers if they have permits, else the broker will
consider next
+ * priority level consumers.
+ *
+ * <p>
+ * If the subscription has consumer-A with priorityLevel 0 and
consumer-B with
+ * priorityLevel 1 then the broker will dispatch messages only to
consumer-A until it
+ * runs out of permits, then the broker will start dispatching messages
to consumer-B.
+ *
+ * <p>
+ * <pre>
+ * Consumer PriorityLevel Permits
+ * C1 0 2
+ * C2 0 1
+ * C3 0 1
+ * C4 1 2
+ * C5 1 1
+ * Order in which the broker dispatches messages to consumers: C1, C2,
C3, C1, C4, C5, C4
+ * </pre>
+ *
+ * <p>
+ * <b>Failover subscription</b> The broker selects the active consumer
for a
+ * failover-subscription based on the consumer priority-level and
lexicographical
+ * sorting of consumer names. eg: <pre>
+ * 1. Active consumer = C1 : Same priority-level and lexicographical
sorting
+ * Consumer PriorityLevel Name
+ * C1 0 aaa
+ * C2 0 bbb
+ *
+ * 2. Active consumer = C2 : Consumer with highest priority
+ * Consumer PriorityLevel Name
+ * C1 1 aaa
+ * C2 0 bbb
+ *
+ * Partitioned-topics:
+ * The broker evenly assigns partitioned topics to highest priority
consumers.
+ * </pre>
+ * @param priorityLevel the priority level of this consumer
+ * @return the consumer builder instance
+ * @see ConsumerBuilder#priorityLevel(int)
+ */
default ReactiveMessageConsumerBuilder<T> priorityLevel(Integer
priorityLevel) {
getMutableSpec().setPriorityLevel(priorityLevel);
return this;
}
+ /**
+ * Sets whether the consumer will read messages from the compacted
topic rather than
+ * reading the full message backlog of the topic. This means that, if
the topic has
+ * been compacted, the consumer will only see the latest value for each
key in the
+ * topic, up until the point in the topic message backlog that has been
compacted.
+ * Beyond that point, the messages will be sent as normal.
+ *
+ * <p>
+ * readCompacted can only be enabled for subscriptions to persistent
topics, which
+ * have a single active consumer (i.e. failure or exclusive
subscriptions). Attempting
+ * to enable it on subscriptions to a non-persistent topic or on a
shared
+ * subscription, will lead to the subscription call throwing a
PulsarClientException.
+ * @param readCompacted whether to read from the compacted topic
+ * @return the consumer builder instance
+ * @see ConsumerBuilder#readCompacted(boolean)
+ */
default ReactiveMessageConsumerBuilder<T> readCompacted(boolean
readCompacted) {
getMutableSpec().setReadCompacted(readCompacted);
return this;
}
+ /**
+ * Enables or disables the batch index acknowledgment. For the batch
index
+ * acknowledgment feature to work, it must also be enabled on the
broker.
+ * @param batchIndexAckEnabled whether to enable batch index
acknowledgment
+ * @return the consumer builder instance
+ * @see ConsumerBuilder#enableBatchIndexAcknowledgment(boolean)
+ */
default ReactiveMessageConsumerBuilder<T> batchIndexAckEnabled(boolean
batchIndexAckEnabled) {
getMutableSpec().setBatchIndexAckEnabled(batchIndexAckEnabled);
return this;
}
+ /**
+ * Sets the timeout for unacknowledged messages, truncated to the
nearest millisecond.
+ * The timeout needs to be greater than 1 second.
+ *
+ * <p>
+ * By default, the acknowledge timeout is disabled and that means that
messages
+ * delivered to a consumer will not be re-delivered unless the consumer
crashes.
+ *
+ * <p>
+ * When enabling the acknowledge timeout, if a message is not
acknowledged within the
+ * specified timeout it will be re-delivered to the consumer (possibly
to a different
+ * consumer in case of a shared subscription).
+ * @param ackTimeout the timeout for unacknowledged messages.
+ * @return the consumer builder instance
+ * @see ConsumerBuilder#ackTimeout(long, TimeUnit)
+ */
default ReactiveMessageConsumerBuilder<T> ackTimeout(Duration
ackTimeout) {
getMutableSpec().setAckTimeout(ackTimeout);
return this;
}
+ /**
+ * Sets the granularity of the ack-timeout redelivery.
+ *
+ * <p>
+ * By default, the tick time is set to 1 second. Using an higher tick
time will reduce
+ * the memory overhead to track messages when the ack-timeout is set to
bigger values
+ * (eg: 1hour).
+ * @param ackTimeoutTickTime the minimum precision for the acknowledge
timeout
+ * messages tracker
+ * @return the consumer builder instance
+ * @see ConsumerBuilder#ackTimeoutTickTime(long, TimeUnit)
+ */
default ReactiveMessageConsumerBuilder<T> ackTimeoutTickTime(Duration
ackTimeoutTickTime) {
getMutableSpec().setAckTimeoutTickTime(ackTimeoutTickTime);
return this;
}
+ /**
+ * Sets the duration for grouping the acknowledgement messages.
+ *
+ * <p>
+ * By default, the consumer will use a 100 ms grouping time to send the
+ * acknowledgements to the broker.
+ *
+ * <p>
+ * Setting a group time of 0, will send the acknowledgements
immediately. A longer
+ * acknowledgement group time will be more efficient at the expense of
a slight
+ * increase in message re-deliveries after a failure.
+ * @param acknowledgementsGroupTime the duration for grouping the
acknowledgement
+ * messages
+ * @return the consumer builder instance
+ * @see ConsumerBuilder#acknowledgmentGroupTime(long, TimeUnit)
+ */
default ReactiveMessageConsumerBuilder<T>
acknowledgementsGroupTime(Duration acknowledgementsGroupTime) {
getMutableSpec().setAcknowledgementsGroupTime(acknowledgementsGroupTime);
return this;
@@ -175,81 +484,251 @@ public interface ReactiveMessageConsumerBuilder<T> {
* allowing the acknowledges and message processing to interleave.
Defaults to true.
* @param acknowledgeAsynchronously when set to true, ignores the
acknowledge
* operation completion
- * @return the current ReactiveMessageConsumerFactory instance (this)
+ * @return the consumer builder instance
*/
default ReactiveMessageConsumerBuilder<T>
acknowledgeAsynchronously(boolean acknowledgeAsynchronously) {
getMutableSpec().setAcknowledgeAsynchronously(acknowledgeAsynchronously);
return this;
}
+ /**
+ * Sets the scheduler to use to handle acknowledgements.
+ * @param acknowledgeScheduler the scheduler to use to handle
acknowledgements
+ * @return the consumer builder instance
+ */
default ReactiveMessageConsumerBuilder<T>
acknowledgeScheduler(Scheduler acknowledgeScheduler) {
getMutableSpec().setAcknowledgeScheduler(acknowledgeScheduler);
return this;
}
+ /**
+ * Sets the delay to wait before re-delivering messages that have
failed to be
+ * processed.
+ *
+ * <p>
+ * When the application uses {@link
MessageResult#negativeAcknowledge(Message)}, the
+ * failed message will be redelivered after a fixed timeout. The
default is 1 min.
+ * @param negativeAckRedeliveryDelay the redelivery delay for failed
messages
+ * @return the consumer builder instance
+ * @see MessageResult#negativeAcknowledge(Message)
+ * @see ConsumerBuilder#negativeAckRedeliveryDelay(long, TimeUnit)
+ */
default ReactiveMessageConsumerBuilder<T>
negativeAckRedeliveryDelay(Duration negativeAckRedeliveryDelay) {
getMutableSpec().setNegativeAckRedeliveryDelay(negativeAckRedeliveryDelay);
return this;
}
+ /**
+ * Sets a dead letter policy for the consumer.
+ *
+ * <p>
+ * By default messages are redelivered indefinitely if they are not
acknowledged. By
+ * using a dead letter mechanism, messages that have reached the max
redelivery count
+ * will be acknowledged automatically and send to the configured dead
letter topic.
+ *
+ * <p>
+ * You can enable the dead letter mechanism by setting a dead letter
policy. Example:
+ * <pre>
+ * client.messageConsumer(Schema.BYTES)
+ *
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(10).build())
+ * .build();
+ * </pre> Default the dead letter topic name is
{TopicName}-{Subscription}-DLQ. You
+ * can set o set a custom dead letter topic name like this: <pre>
+ * client.messageConsumer(Schema.BYTES)
+ * .deadLetterPolicy(DeadLetterPolicy
+ * .builder()
+ * .maxRedeliverCount(10)
+ * .deadLetterTopic("your-topic-name")
+ * .build())
+ * .build();
+ * </pre> When a dead letter policy is specified, and no
acknowledgement timeout is
+ * specified, then the acknowledgement timeout will be set to 30000
millisecond.
+ * @param deadLetterPolicy the dead letter policy to use
+ * @return the consumer builder instance
+ * @see ConsumerBuilder#deadLetterPolicy(DeadLetterPolicy)
+ */
default ReactiveMessageConsumerBuilder<T>
deadLetterPolicy(DeadLetterPolicy deadLetterPolicy) {
getMutableSpec().setDeadLetterPolicy(deadLetterPolicy);
return this;
}
+ /**
+ * Sets whether automatic routing to retry letter topic and dead letter
topic are
+ * enabled.
+ * @param retryLetterTopicEnable whether to automatic retry/dead-letter
topics are
+ * enabled
+ * @return the consumer builder instance
+ * @see ConsumerBuilder#enableRetry(boolean)
+ */
default ReactiveMessageConsumerBuilder<T>
retryLetterTopicEnable(boolean retryLetterTopicEnable) {
getMutableSpec().setRetryLetterTopicEnable(retryLetterTopicEnable);
return this;
}
+ /**
+ * Sets the size of the consumer receiver queue.
+ *
+ * <p>
+ * The consumer receiver queue controls how many messages can be
accumulated by the
+ * {@link ReactiveMessageConsumer} before backpressure triggers. Using
a higher value
+ * could potentially increase the consumer throughput at the expense of
bigger memory
+ * utilization.
+ *
+ * <p>
+ * <b>Setting the consumer queue size to zero</b>
+ * <ul>
+ * <li>Decreases the throughput of the consumer, by disabling
pre-fetching of
+ * messages. This approach improves the message distribution on shared
subscription,
+ * by pushing messages only to the consumers that are ready to process
them./li>
+ * <li>Doesn't support batched messages: if the consumer receives any
batched message,
+ * it will close the connection with the broker and
+ * {@link ReactiveMessageConsumer#consumeOne},
+ * {@link ReactiveMessageConsumer#consumeMany} will emit an error. <b>
the consumer
+ * will not be able receive any further message unless the batch
message is
+ * removed</b></li>
+ * </ul>
+ * The default value is {@code 1000} messages and should be good for
most use cases.
+ * @param receiverQueueSize the receiver queue size value
+ * @return the consumer builder instance
+ * @see ConsumerBuilder#receiverQueueSize(int)
+ */
default ReactiveMessageConsumerBuilder<T> receiverQueueSize(Integer
receiverQueueSize) {
getMutableSpec().setReceiverQueueSize(receiverQueueSize);
return this;
}
+ /**
+ * Sets the maximum total receiver queue size across partitons.
+ *
+ * <p>
+ * This setting is used to reduce the receiver queue size for
individual partitions
+ * {@link #receiverQueueSize(Integer)} if the total exceeds this value
(default:
+ * 50000). The purpose of this setting is to have an upper-limit on the
number of
+ * messages that a consumer can be pushed at once from a broker, across
all the
+ * partitions.
+ * @param maxTotalReceiverQueueSizeAcrossPartitions the maximum pending
messages
+ * across all the partitions
+ * @return the consumer builder instance
+ * @see ConsumerBuilder#maxTotalReceiverQueueSizeAcrossPartitions(int)
+ */
default ReactiveMessageConsumerBuilder<T>
maxTotalReceiverQueueSizeAcrossPartitions(
Integer maxTotalReceiverQueueSizeAcrossPartitions) {
getMutableSpec().setMaxTotalReceiverQueueSizeAcrossPartitions(maxTotalReceiverQueueSizeAcrossPartitions);
return this;
}
+ /**
+ * Sets whether the consumer shall automatically subscribe to new
partitions added to
+ * the topic. This is only for partitioned topics.
+ * @param autoUpdatePartitions whether to automatically subscribe to
new partitions
+ * @return the consumer builder instance
+ * @see ConsumerBuilder#autoUpdatePartitions(boolean)
+ */
default ReactiveMessageConsumerBuilder<T> autoUpdatePartitions(boolean
autoUpdatePartitions) {
getMutableSpec().setAutoUpdatePartitions(autoUpdatePartitions);
return this;
}
+ /**
+ * Sets the interval for checking partitions updates <i>(default: 1
minute)</i>. This
+ * only applies if {@link #autoUpdatePartitions} is enabled.
+ * @param autoUpdatePartitionsInterval the interval for checking
partitions updates
+ * @return the consumer builder instance
+ * @see ConsumerBuilder#autoUpdatePartitionsInterval(int, TimeUnit)
+ */
default ReactiveMessageConsumerBuilder<T>
autoUpdatePartitionsInterval(Duration autoUpdatePartitionsInterval) {
getMutableSpec().setAutoUpdatePartitionsInterval(autoUpdatePartitionsInterval);
return this;
}
+ /**
+ * Sets the key reader to be used to decrypt the message payloads.
+ * @param cryptoKeyReader the key reader to be used to decrypt the
message payloads.
+ * @return the consumer builder instance
+ * @see ConsumerBuilder#cryptoKeyReader(CryptoKeyReader)
+ */
default ReactiveMessageConsumerBuilder<T>
cryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
getMutableSpec().setCryptoKeyReader(cryptoKeyReader);
return this;
}
+ /**
+ * Sets the action the consumer will take in case of decryption
failures.
+ * @param cryptoFailureAction the action the consumer will take in case
of decryption
+ * failures
+ * @return the consumer builder instance
+ * @see ConsumerBuilder#cryptoFailureAction(ConsumerCryptoFailureAction)
+ */
default ReactiveMessageConsumerBuilder<T>
cryptoFailureAction(ConsumerCryptoFailureAction cryptoFailureAction) {
getMutableSpec().setCryptoFailureAction(cryptoFailureAction);
return this;
}
+ /**
+ * Sets the maximum pending chunked messages. Consumer buffers chunk
messages into
+ * memory until it receives all the chunks of the original message.
While consuming
+ * chunk-messages, chunks from same message might not be contiguous in
the stream and
+ * they might be mixed with other messages' chunks. so, consumer has to
maintain
+ * multiple buffers to manage chunks coming from different messages.
This mainly
+ * happens when multiple publishers are publishing messages on the
topic concurrently
+ * or publisher failed to publish all chunks of the messages.
+ *
+ * <pre>
+ * eg: M1-C1, M2-C1, M1-C2, M2-C2
+ * Here, Messages M1-C1 and M1-C2 belong to original message M1, M2-C1
and M2-C2 messages belong to M2 message.
+ * </pre> Buffering large number of outstanding uncompleted chunked
messages can
+ * create memory pressure. It can be guarded by providing a
+ * {@code maxPendingChunkedMessage} threshold. Once the consumer
reaches this
+ * threshold, it drops the outstanding unchunked-messages by silently
acknowledging or
+ * asking the broker to redeliver later by marking it unacknowledged.
This behavior
+ * can be controlled by setting {@link
#autoAckOldestChunkedMessageOnQueueFull} The
+ * default value is 10.
+ * @param maxPendingChunkedMessage the maximum pending chunked messages.
+ * @return the consumer builder instance
+ * @see ConsumerBuilder#maxPendingChunkedMessage(int)
+ */
default ReactiveMessageConsumerBuilder<T>
maxPendingChunkedMessage(Integer maxPendingChunkedMessage) {
getMutableSpec().setMaxPendingChunkedMessage(maxPendingChunkedMessage);
return this;
}
+ /**
+ * Sets whether non-chunked messages are silently acknowledged when
+ * {@code maxPendingChunkedMessage} is reached. Buffering large number
of outstanding
+ * uncompleted chunked messages can create memory pressure. It can be
guarded by
+ * providing {@link #maxPendingChunkedMessage} threshold. Once the
consumer reaches
+ * this threshold, it drops the outstanding non-chunked messages by
silently
+ * acknowledging if autoAckOldestChunkedMessageOnQueueFull is true or
else it marks
+ * them for redelivery. Defaults to false.
+ * @param autoAckOldestChunkedMessageOnQueueFull whether non-chunked
messages are
+ * silently acknowledged
+ * @return the consumer builder instance
+ * @see ConsumerBuilder#autoAckOldestChunkedMessageOnQueueFull(boolean)
+ */
default ReactiveMessageConsumerBuilder<T>
autoAckOldestChunkedMessageOnQueueFull(
boolean autoAckOldestChunkedMessageOnQueueFull) {
getMutableSpec().setAutoAckOldestChunkedMessageOnQueueFull(autoAckOldestChunkedMessageOnQueueFull);
return this;
}
+ /**
+ * Sets the duration after which incomplete chunked messages are
expired (happens for
+ * instance if the producer fails to publish all the chunks).
+ * @param expireTimeOfIncompleteChunkedMessage the duration after which
incomplete
+ * chunked messages are expired
+ * @return the consumer builder instance
+ * @see ConsumerBuilder#expireTimeOfIncompleteChunkedMessage(long,
TimeUnit)
+ */
default ReactiveMessageConsumerBuilder<T>
expireTimeOfIncompleteChunkedMessage(
Duration expireTimeOfIncompleteChunkedMessage) {
getMutableSpec().setExpireTimeOfIncompleteChunkedMessage(expireTimeOfIncompleteChunkedMessage);
return this;
}
+ /**
+ * Builds the reactive message consumer.
+ * @return the reactive message consumer
+ */
ReactiveMessageConsumer<T> build();
}