This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new d79cd04 Add negative ack redelivery backoff. (#12566) d79cd04 is described below commit d79cd0479eabebef2ce72eca1330af103115f67f Author: hanmz <gunn...@tencent.com> AuthorDate: Thu Nov 4 14:15:17 2021 +0800 Add negative ack redelivery backoff. (#12566) ### Motivation Add negative ack redelivery backoff. ### Modifications - add new `NegativeAckBackoff` interface - expose `egativeAckRedeliveryBackoff` in ConsumerBulider - add unit test case --- .../pulsar/client/impl/NegativeAcksTest.java | 102 +++++++++++++++++++++ .../pulsar/client/api/ConsumerConfiguration.java | 17 ++++ .../apache/pulsar/client/api/ConsumerBuilder.java | 14 +++ .../client/api/NegativeAckRedeliveryBackoff.java | 40 ++++++++ .../pulsar/client/impl/ConsumerBuilderImpl.java | 8 ++ .../apache/pulsar/client/impl/ConsumerImpl.java | 8 ++ .../client/impl/MultiTopicsConsumerImpl.java | 10 ++ .../NegativeAckRedeliveryExponentialBackoff.java | 94 +++++++++++++++++++ .../pulsar/client/impl/NegativeAcksTracker.java | 46 +++++++++- .../impl/conf/ConsumerConfigurationData.java | 4 + .../api/NegativeAckRedeliveryBackoffTest.java | 55 +++++++++++ .../client/impl/ConsumerBuilderImplTest.java | 8 ++ 12 files changed, 405 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java index 5eb43af..638a969 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java @@ -31,6 +31,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.NegativeAckRedeliveryBackoff; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.Schema; @@ -154,4 +155,105 @@ public class NegativeAcksTest extends ProducerConsumerBase { consumer.close(); producer.close(); } + + @DataProvider(name = "variationsBackoff") + public static Object[][] variationsBackoff() { + return new Object[][] { + // batching / partitions / subscription-type / min-nack-time-ms/ max-nack-time-ms / ack-timeout + { false, false, SubscriptionType.Shared, 100, 1000, 0 }, + { false, false, SubscriptionType.Failover, 100, 1000, 0 }, + { false, true, SubscriptionType.Shared, 100, 1000, 0 }, + { false, true, SubscriptionType.Failover, 100, 1000, 0 }, + { true, false, SubscriptionType.Shared, 100, 1000, 0 }, + { true, false, SubscriptionType.Failover, 100, 1000, 0 }, + { true, true, SubscriptionType.Shared, 100, 1000, 0 }, + { true, true, SubscriptionType.Failover, 100, 1000, 0 }, + + { false, false, SubscriptionType.Shared, 0, 1000, 0 }, + { false, false, SubscriptionType.Failover, 0, 1000, 0 }, + { false, true, SubscriptionType.Shared, 0, 1000, 0 }, + { false, true, SubscriptionType.Failover, 0, 1000, 0 }, + { true, false, SubscriptionType.Shared, 0, 1000, 0 }, + { true, false, SubscriptionType.Failover, 0, 1000, 0 }, + { true, true, SubscriptionType.Shared, 0, 1000, 0 }, + { true, true, SubscriptionType.Failover, 0, 1000, 0 }, + + { false, false, SubscriptionType.Shared, 100, 1000, 1000 }, + { false, false, SubscriptionType.Failover, 100, 1000, 1000 }, + { false, true, SubscriptionType.Shared, 100, 1000, 1000 }, + { false, true, SubscriptionType.Failover, 100, 1000, 1000 }, + { true, false, SubscriptionType.Shared, 100, 1000, 1000 }, + { true, false, SubscriptionType.Failover, 100, 1000, 1000 }, + { true, true, SubscriptionType.Shared, 100, 1000, 1000 }, + { true, true, SubscriptionType.Failover, 100, 1000, 1000 }, + + { false, false, SubscriptionType.Shared, 0, 1000, 1000 }, + { false, false, SubscriptionType.Failover, 0, 1000, 1000 }, + { false, true, SubscriptionType.Shared, 0, 1000, 1000 }, + { false, true, SubscriptionType.Failover, 0, 1000, 1000 }, + { true, false, SubscriptionType.Shared, 0, 1000, 1000 }, + { true, false, SubscriptionType.Failover, 0, 1000, 1000 }, + { true, true, SubscriptionType.Shared, 0, 1000, 1000 }, + { true, true, SubscriptionType.Failover, 0, 1000, 1000 }, + }; + } + + @Test(dataProvider = "variationsBackoff") + public void testNegativeAcksWithBackoff(boolean batching, boolean usePartitions, SubscriptionType subscriptionType, + int minNackTimeMs, int maxNackTimeMs, int ackTimeout) + throws Exception { + log.info("Test negative acks with back off batching={} partitions={} subType={} minNackTimeMs={}, " + + "maxNackTimeMs={}", batching, usePartitions, subscriptionType, minNackTimeMs, maxNackTimeMs); + String topic = BrokerTestUtil.newUniqueName("testNegativeAcksWithBackoff"); + + @Cleanup + Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("sub1") + .acknowledgmentGroupTime(0, TimeUnit.SECONDS) + .subscriptionType(subscriptionType) + .negativeAckRedeliveryBackoff(NegativeAckRedeliveryExponentialBackoff.builder() + .minNackTimeMs(minNackTimeMs) + .maxNackTimeMs(maxNackTimeMs) + .build()) + .ackTimeout(ackTimeout, TimeUnit.MILLISECONDS) + .subscribe(); + + @Cleanup + Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .enableBatching(batching) + .create(); + + Set<String> sentMessages = new HashSet<>(); + + final int N = 10; + for (int i = 0; i < N; i++) { + String value = "test-" + i; + producer.sendAsync(value); + sentMessages.add(value); + } + producer.flush(); + + for (int i = 0; i < N; i++) { + Message<String> msg = consumer.receive(); + consumer.negativeAcknowledge(msg); + } + + Set<String> receivedMessages = new HashSet<>(); + + // All the messages should be received again + for (int i = 0; i < N; i++) { + Message<String> msg = consumer.receive(); + receivedMessages.add(msg.getValue()); + consumer.acknowledge(msg); + } + + assertEquals(receivedMessages, sentMessages); + + // There should be no more messages + assertNull(consumer.receive(100, TimeUnit.MILLISECONDS)); + consumer.close(); + producer.close(); + } } diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java index 291cce3..3de0187 100644 --- a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java +++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java @@ -375,4 +375,21 @@ public class ConsumerConfiguration implements Serializable { public SubscriptionInitialPosition getSubscriptionInitialPosition(){ return conf.getSubscriptionInitialPosition(); } + + /** + * @return the configured {@link NegativeAckRedeliveryBackoff} for the consumer + */ + public NegativeAckRedeliveryBackoff getNegativeAckRedeliveryBackoff() { + return conf.getNegativeAckRedeliveryBackoff(); + } + + /** + * @param negativeAckRedeliveryBackoff the negative ack redelivery backoff policy. + * Default value is: NegativeAckRedeliveryExponentialBackoff + * @return the {@link ConsumerConfiguration} + */ + public ConsumerConfiguration setNegativeAckRedeliveryBackoff(NegativeAckRedeliveryBackoff negativeAckRedeliveryBackoff) { + conf.setNegativeAckRedeliveryBackoff(negativeAckRedeliveryBackoff); + return this; + } } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index 3c3ce17..af1ece8 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -749,4 +749,18 @@ public interface ConsumerBuilder<T> extends Cloneable { * Default: null */ ConsumerBuilder<T> messagePayloadProcessor(MessagePayloadProcessor payloadProcessor); + + /** + * Notice: the negativeAckRedeliveryBackoff will not work with `consumer.negativeAcknowledge(MessageId messageId)` + * because we are not able to get the redelivery count from the message ID. + * + * <p>Example: + * <pre> + * client.newConsumer().negativeAckRedeliveryBackoff(NegativeAckRedeliveryExponentialBackoff.builder() + * .minNackTimeMs(1000) + * .maxNackTimeMs(60 * 1000) + * .build()).subscribe(); + * </pre> + */ + ConsumerBuilder<T> negativeAckRedeliveryBackoff(NegativeAckRedeliveryBackoff negativeAckRedeliveryBackoff); } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/NegativeAckRedeliveryBackoff.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/NegativeAckRedeliveryBackoff.java new file mode 100644 index 0000000..8e19c85 --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/NegativeAckRedeliveryBackoff.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import java.io.Serializable; +import org.apache.pulsar.common.classification.InterfaceAudience; +import org.apache.pulsar.common.classification.InterfaceStability; + +/** + * Interface for custom message is negativeAcked policy, users can specify a {@link NegativeAckRedeliveryBackoff} for + * a consumer. + * + * Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the + * {@link NegativeAckRedeliveryBackoff}, which means the message might get redelivered earlier than the delay time + * from the backoff. + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public interface NegativeAckRedeliveryBackoff extends Serializable { + /** + * @param redeliveryCount indicates the number of times the message was redelivered + */ + long next(int redeliveryCount); +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index cbfc27d..d08dbda 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -45,6 +45,7 @@ import org.apache.pulsar.client.api.KeySharedPolicy; import org.apache.pulsar.client.api.MessageCrypto; import org.apache.pulsar.client.api.MessageListener; import org.apache.pulsar.client.api.MessagePayloadProcessor; +import org.apache.pulsar.client.api.NegativeAckRedeliveryBackoff; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException; import org.apache.pulsar.client.api.RegexSubscriptionMode; @@ -486,4 +487,11 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> { conf.setPayloadProcessor(payloadProcessor); return this; } + + @Override + public ConsumerBuilder<T> negativeAckRedeliveryBackoff(NegativeAckRedeliveryBackoff negativeAckRedeliveryBackoff) { + checkArgument(negativeAckRedeliveryBackoff != null, "negativeAckRedeliveryBackoff must not be null."); + conf.setNegativeAckRedeliveryBackoff(negativeAckRedeliveryBackoff); + return this; + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 8245de6..79aad9c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -689,6 +689,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle } @Override + public void negativeAcknowledge(Message<?> message) { + negativeAcksTracker.add(message); + + // Ensure the message is not redelivered for ack-timeout, since we did receive an "ack" + unAckedMessageTracker.remove(message.getMessageId()); + } + + @Override public void connectionOpened(final ClientCnx cnx) { previousExceptions.clear(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 21ae2d7..f765e5e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -523,6 +523,16 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { } @Override + public void negativeAcknowledge(Message<?> message) { + MessageId messageId = message.getMessageId(); + checkArgument(messageId instanceof TopicMessageIdImpl); + TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId; + + ConsumerImpl<T> consumer = consumers.get(topicMessageId.getTopicPartitionName()); + consumer.negativeAcknowledge(message); + } + + @Override public CompletableFuture<Void> unsubscribeAsync() { if (getState() == State.Closing || getState() == State.Closed) { return FutureUtil.failedFuture( diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAckRedeliveryExponentialBackoff.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAckRedeliveryExponentialBackoff.java new file mode 100644 index 0000000..9b0eb7a --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAckRedeliveryExponentialBackoff.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import com.google.common.base.Preconditions; +import org.apache.pulsar.client.api.NegativeAckRedeliveryBackoff; + +/** + * NegativeAckRedeliveryExponentialBackoff + */ +public class NegativeAckRedeliveryExponentialBackoff implements NegativeAckRedeliveryBackoff { + + private final long minNackTimeMs; + private final long maxNackTimeMs; + private final int maxBitShift; + + private NegativeAckRedeliveryExponentialBackoff(long minNackTimeMs, long maxNackTimeMs) { + this.minNackTimeMs = minNackTimeMs; + this.maxNackTimeMs = maxNackTimeMs; + + for (int i = 0; ; ) { + if (this.minNackTimeMs << ++i <= 0) { + this.maxBitShift = i; + break; + } + } + } + + public static NegativeAckRedeliveryExponentialBackoff.NegativeAckRedeliveryExponentialBackoffBuilder builder() { + return new NegativeAckRedeliveryExponentialBackoff.NegativeAckRedeliveryExponentialBackoffBuilder(); + } + + public long getMinNackTimeMs() { + return this.minNackTimeMs; + } + + public long getMaxNackTimeMs() { + return this.maxNackTimeMs; + } + + @Override + public long next(int redeliveryCount) { + if (redeliveryCount <= 0 || minNackTimeMs <= 0) { + return this.minNackTimeMs; + } + + if (this.maxBitShift <= redeliveryCount) { + return this.maxNackTimeMs; + } + + return Math.min(this.minNackTimeMs << redeliveryCount, this.maxNackTimeMs); + } + + /** + * Builder of NegativeAckRedeliveryExponentialBackoff. + */ + public static class NegativeAckRedeliveryExponentialBackoffBuilder { + private long minNackTimeMs = 1000 * 10; + private long maxNackTimeMs = 1000 * 60 * 10; + + public NegativeAckRedeliveryExponentialBackoffBuilder minNackTimeMs(long minNackTimeMs) { + this.minNackTimeMs = minNackTimeMs; + return this; + } + + public NegativeAckRedeliveryExponentialBackoffBuilder maxNackTimeMs(long maxNackTimeMs) { + this.maxNackTimeMs = maxNackTimeMs; + return this; + } + + public NegativeAckRedeliveryExponentialBackoff build() { + Preconditions.checkArgument(minNackTimeMs >= 0, "min nack time must be >= 0"); + Preconditions.checkArgument(maxNackTimeMs >= minNackTimeMs, + "max nack time must be >= minNackTimeMs"); + return new NegativeAckRedeliveryExponentialBackoff(minNackTimeMs, maxNackTimeMs); + } + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java index a062009..5accee4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java @@ -27,7 +27,9 @@ import java.util.HashSet; import java.util.Set; import java.util.concurrent.TimeUnit; +import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.NegativeAckRedeliveryBackoff; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import static org.apache.pulsar.client.impl.UnAckedMessageTracker.addChunkedMessageIdsAndRemoveFromSequenceMap; @@ -39,6 +41,7 @@ class NegativeAcksTracker implements Closeable { private final Timer timer; private final long nackDelayNanos; private final long timerIntervalNanos; + private final NegativeAckRedeliveryBackoff negativeAckRedeliveryBackoff; private Timeout timeout; @@ -50,7 +53,14 @@ class NegativeAcksTracker implements Closeable { this.timer = consumer.getClient().timer(); this.nackDelayNanos = Math.max(TimeUnit.MICROSECONDS.toNanos(conf.getNegativeAckRedeliveryDelayMicros()), MIN_NACK_DELAY_NANOS); - this.timerIntervalNanos = nackDelayNanos / 3; + this.negativeAckRedeliveryBackoff = conf.getNegativeAckRedeliveryBackoff(); + if (negativeAckRedeliveryBackoff != null) { + this.timerIntervalNanos = Math.max( + TimeUnit.MILLISECONDS.toNanos(negativeAckRedeliveryBackoff.next(0)), + MIN_NACK_DELAY_NANOS) / 3; + } else { + this.timerIntervalNanos = nackDelayNanos / 3; + } } private synchronized void triggerRedelivery(Timeout t) { @@ -95,6 +105,40 @@ class NegativeAcksTracker implements Closeable { } } + public synchronized void add(Message<?> message) { + if (negativeAckRedeliveryBackoff == null) { + add(message.getMessageId()); + return; + } + add(message.getMessageId(), message.getRedeliveryCount()); + } + + private synchronized void add(MessageId messageId, int redeliveryCount) { + if (messageId instanceof TopicMessageIdImpl) { + TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId; + messageId = topicMessageId.getInnerMessageId(); + } + + if (messageId instanceof BatchMessageIdImpl) { + BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId; + messageId = new MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId(), + batchMessageId.getPartitionIndex()); + } + + if (nackedMessages == null) { + nackedMessages = new HashMap<>(); + } + + long backoffNs = TimeUnit.MILLISECONDS.toNanos(negativeAckRedeliveryBackoff.next(redeliveryCount)); + nackedMessages.put(messageId, System.nanoTime() + backoffNs); + + if (this.timeout == null) { + // Schedule a task and group all the redeliveries for same period. Leave a small buffer to allow for + // nack immediately following the current one will be batched into the same redeliver request. + this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS); + } + } + @Override public synchronized void close() { if (timeout != null && !timeout.isCancelled()) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index 1076946..9603992 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -43,6 +43,7 @@ import org.apache.pulsar.client.api.KeySharedPolicy; import org.apache.pulsar.client.api.MessageCrypto; import org.apache.pulsar.client.api.MessageListener; import org.apache.pulsar.client.api.MessagePayloadProcessor; +import org.apache.pulsar.client.api.NegativeAckRedeliveryBackoff; import org.apache.pulsar.client.api.RegexSubscriptionMode; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionMode; @@ -70,6 +71,9 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable { @JsonIgnore private ConsumerEventListener consumerEventListener; + @JsonIgnore + private NegativeAckRedeliveryBackoff negativeAckRedeliveryBackoff; + private int receiverQueueSize = 1000; private long acknowledgementsGroupTimeMicros = TimeUnit.MILLISECONDS.toMicros(100); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/api/NegativeAckRedeliveryBackoffTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/api/NegativeAckRedeliveryBackoffTest.java new file mode 100644 index 0000000..206a6f2 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/api/NegativeAckRedeliveryBackoffTest.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import static org.mockito.Mockito.spy; +import static org.testng.Assert.assertEquals; + +import org.apache.pulsar.client.impl.NegativeAckRedeliveryExponentialBackoff; +import org.testng.annotations.Test; + +/** + * Unit test of {@link NegativeAckRedeliveryBackoff}. + */ +public class NegativeAckRedeliveryBackoffTest { + + @SuppressWarnings("deprecation") + @Test + public void testNext() { + + long minNackTime = 1000; + long maxNackTime = 1000 * 60 * 10; + + NegativeAckRedeliveryBackoff nackBackoff = spy( + NegativeAckRedeliveryExponentialBackoff.builder() + .minNackTimeMs(minNackTime) + .maxNackTimeMs(maxNackTime) + .build()); + + assertEquals(nackBackoff.next(-1), minNackTime); + + assertEquals(nackBackoff.next(0), minNackTime); + + assertEquals(nackBackoff.next(1),minNackTime * 2); + + assertEquals(nackBackoff.next(4), minNackTime * 16); + + assertEquals(nackBackoff.next(100), maxNackTime); + } +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java index 13d63ba..798069c 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java @@ -302,4 +302,12 @@ public class ConsumerBuilderImplTest { consumerBuilderImpl.subscriptionMode(SubscriptionMode.NonDurable) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest); } + + @Test + public void testNegativeAckRedeliveryBackoff() { + consumerBuilderImpl.negativeAckRedeliveryBackoff(NegativeAckRedeliveryExponentialBackoff.builder() + .minNackTimeMs(1000) + .maxNackTimeMs(10 * 1000) + .build()); + } }