This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d79cd04  Add negative ack redelivery backoff. (#12566)
d79cd04 is described below

commit d79cd0479eabebef2ce72eca1330af103115f67f
Author: hanmz <gunn...@tencent.com>
AuthorDate: Thu Nov 4 14:15:17 2021 +0800

    Add negative ack redelivery backoff. (#12566)
    
    ### Motivation
    
    Add negative ack redelivery backoff.
    
    
    ### Modifications
    
    - add new `NegativeAckBackoff` interface
    - expose `egativeAckRedeliveryBackoff` in ConsumerBulider
    - add unit test case
---
 .../pulsar/client/impl/NegativeAcksTest.java       | 102 +++++++++++++++++++++
 .../pulsar/client/api/ConsumerConfiguration.java   |  17 ++++
 .../apache/pulsar/client/api/ConsumerBuilder.java  |  14 +++
 .../client/api/NegativeAckRedeliveryBackoff.java   |  40 ++++++++
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |   8 ++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |   8 ++
 .../client/impl/MultiTopicsConsumerImpl.java       |  10 ++
 .../NegativeAckRedeliveryExponentialBackoff.java   |  94 +++++++++++++++++++
 .../pulsar/client/impl/NegativeAcksTracker.java    |  46 +++++++++-
 .../impl/conf/ConsumerConfigurationData.java       |   4 +
 .../api/NegativeAckRedeliveryBackoffTest.java      |  55 +++++++++++
 .../client/impl/ConsumerBuilderImplTest.java       |   8 ++
 12 files changed, 405 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index 5eb43af..638a969 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -31,6 +31,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.NegativeAckRedeliveryBackoff;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.Schema;
@@ -154,4 +155,105 @@ public class NegativeAcksTest extends 
ProducerConsumerBase {
         consumer.close();
         producer.close();
     }
+
+    @DataProvider(name = "variationsBackoff")
+    public static Object[][] variationsBackoff() {
+        return new Object[][] {
+                // batching / partitions / subscription-type / 
min-nack-time-ms/ max-nack-time-ms / ack-timeout
+                { false, false, SubscriptionType.Shared, 100, 1000, 0 },
+                { false, false, SubscriptionType.Failover, 100, 1000, 0 },
+                { false, true, SubscriptionType.Shared, 100, 1000, 0 },
+                { false, true, SubscriptionType.Failover, 100, 1000, 0 },
+                { true, false, SubscriptionType.Shared, 100, 1000, 0 },
+                { true, false, SubscriptionType.Failover, 100, 1000, 0 },
+                { true, true, SubscriptionType.Shared, 100, 1000, 0 },
+                { true, true, SubscriptionType.Failover, 100, 1000, 0 },
+
+                { false, false, SubscriptionType.Shared, 0, 1000, 0 },
+                { false, false, SubscriptionType.Failover, 0, 1000, 0 },
+                { false, true, SubscriptionType.Shared, 0, 1000, 0 },
+                { false, true, SubscriptionType.Failover, 0, 1000, 0 },
+                { true, false, SubscriptionType.Shared, 0, 1000, 0 },
+                { true, false, SubscriptionType.Failover, 0, 1000, 0 },
+                { true, true, SubscriptionType.Shared, 0, 1000, 0 },
+                { true, true, SubscriptionType.Failover, 0, 1000, 0 },
+
+                { false, false, SubscriptionType.Shared, 100, 1000, 1000 },
+                { false, false, SubscriptionType.Failover, 100, 1000, 1000 },
+                { false, true, SubscriptionType.Shared, 100, 1000, 1000 },
+                { false, true, SubscriptionType.Failover, 100, 1000, 1000 },
+                { true, false, SubscriptionType.Shared, 100, 1000, 1000 },
+                { true, false, SubscriptionType.Failover, 100, 1000, 1000 },
+                { true, true, SubscriptionType.Shared, 100, 1000, 1000 },
+                { true, true, SubscriptionType.Failover, 100, 1000, 1000 },
+
+                { false, false, SubscriptionType.Shared, 0, 1000, 1000 },
+                { false, false, SubscriptionType.Failover, 0, 1000, 1000 },
+                { false, true, SubscriptionType.Shared, 0, 1000, 1000 },
+                { false, true, SubscriptionType.Failover, 0, 1000, 1000 },
+                { true, false, SubscriptionType.Shared, 0, 1000, 1000 },
+                { true, false, SubscriptionType.Failover, 0, 1000, 1000 },
+                { true, true, SubscriptionType.Shared, 0, 1000, 1000 },
+                { true, true, SubscriptionType.Failover, 0, 1000, 1000 },
+        };
+    }
+
+    @Test(dataProvider = "variationsBackoff")
+    public void testNegativeAcksWithBackoff(boolean batching, boolean 
usePartitions, SubscriptionType subscriptionType,
+            int minNackTimeMs, int maxNackTimeMs, int ackTimeout)
+            throws Exception {
+        log.info("Test negative acks with back off batching={} partitions={} 
subType={} minNackTimeMs={}, "
+                        + "maxNackTimeMs={}", batching, usePartitions, 
subscriptionType, minNackTimeMs, maxNackTimeMs);
+        String topic = 
BrokerTestUtil.newUniqueName("testNegativeAcksWithBackoff");
+
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("sub1")
+                .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
+                .subscriptionType(subscriptionType)
+                
.negativeAckRedeliveryBackoff(NegativeAckRedeliveryExponentialBackoff.builder()
+                        .minNackTimeMs(minNackTimeMs)
+                        .maxNackTimeMs(maxNackTimeMs)
+                        .build())
+                .ackTimeout(ackTimeout, TimeUnit.MILLISECONDS)
+                .subscribe();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .enableBatching(batching)
+                .create();
+
+        Set<String> sentMessages = new HashSet<>();
+
+        final int N = 10;
+        for (int i = 0; i < N; i++) {
+            String value = "test-" + i;
+            producer.sendAsync(value);
+            sentMessages.add(value);
+        }
+        producer.flush();
+
+        for (int i = 0; i < N; i++) {
+            Message<String> msg = consumer.receive();
+            consumer.negativeAcknowledge(msg);
+        }
+
+        Set<String> receivedMessages = new HashSet<>();
+
+        // All the messages should be received again
+        for (int i = 0; i < N; i++) {
+            Message<String> msg = consumer.receive();
+            receivedMessages.add(msg.getValue());
+            consumer.acknowledge(msg);
+        }
+
+        assertEquals(receivedMessages, sentMessages);
+
+        // There should be no more messages
+        assertNull(consumer.receive(100, TimeUnit.MILLISECONDS));
+        consumer.close();
+        producer.close();
+    }
 }
diff --git 
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
 
b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
index 291cce3..3de0187 100644
--- 
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
+++ 
b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
@@ -375,4 +375,21 @@ public class ConsumerConfiguration implements Serializable 
{
     public SubscriptionInitialPosition getSubscriptionInitialPosition(){
         return conf.getSubscriptionInitialPosition();
     }
+
+    /**
+     * @return the configured {@link NegativeAckRedeliveryBackoff} for the 
consumer
+     */
+    public NegativeAckRedeliveryBackoff getNegativeAckRedeliveryBackoff() {
+        return conf.getNegativeAckRedeliveryBackoff();
+    }
+
+    /**
+     * @param negativeAckRedeliveryBackoff the negative ack redelivery backoff 
policy.
+     * Default value is: NegativeAckRedeliveryExponentialBackoff
+     * @return the {@link ConsumerConfiguration}
+     */
+    public ConsumerConfiguration 
setNegativeAckRedeliveryBackoff(NegativeAckRedeliveryBackoff 
negativeAckRedeliveryBackoff) {
+        conf.setNegativeAckRedeliveryBackoff(negativeAckRedeliveryBackoff);
+        return this;
+    }
 }
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index 3c3ce17..af1ece8 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -749,4 +749,18 @@ public interface ConsumerBuilder<T> extends Cloneable {
      * Default: null
      */
     ConsumerBuilder<T> messagePayloadProcessor(MessagePayloadProcessor 
payloadProcessor);
+
+    /**
+     * Notice: the negativeAckRedeliveryBackoff will not work with 
`consumer.negativeAcknowledge(MessageId messageId)`
+     * because we are not able to get the redelivery count from the message ID.
+     *
+     * <p>Example:
+     * <pre>
+     * 
client.newConsumer().negativeAckRedeliveryBackoff(NegativeAckRedeliveryExponentialBackoff.builder()
+     *              .minNackTimeMs(1000)
+     *              .maxNackTimeMs(60 * 1000)
+     *              .build()).subscribe();
+     * </pre>
+     */
+    ConsumerBuilder<T> 
negativeAckRedeliveryBackoff(NegativeAckRedeliveryBackoff 
negativeAckRedeliveryBackoff);
 }
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/NegativeAckRedeliveryBackoff.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/NegativeAckRedeliveryBackoff.java
new file mode 100644
index 0000000..8e19c85
--- /dev/null
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/NegativeAckRedeliveryBackoff.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.io.Serializable;
+import org.apache.pulsar.common.classification.InterfaceAudience;
+import org.apache.pulsar.common.classification.InterfaceStability;
+
+/**
+ * Interface for custom message is negativeAcked policy, users can specify a 
{@link NegativeAckRedeliveryBackoff} for
+ * a consumer.
+ *
+ * Notice: the consumer crashes will trigger the redelivery of the unacked 
message, this case will not respect the
+ * {@link NegativeAckRedeliveryBackoff}, which means the message might get 
redelivered earlier than the delay time
+ * from the backoff.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public interface NegativeAckRedeliveryBackoff extends Serializable  {
+    /**
+     * @param redeliveryCount indicates the number of times the message was 
redelivered
+     */
+    long next(int redeliveryCount);
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index cbfc27d..d08dbda 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -45,6 +45,7 @@ import org.apache.pulsar.client.api.KeySharedPolicy;
 import org.apache.pulsar.client.api.MessageCrypto;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.MessagePayloadProcessor;
+import org.apache.pulsar.client.api.NegativeAckRedeliveryBackoff;
 import org.apache.pulsar.client.api.PulsarClientException;
 import 
org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException;
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
@@ -486,4 +487,11 @@ public class ConsumerBuilderImpl<T> implements 
ConsumerBuilder<T> {
         conf.setPayloadProcessor(payloadProcessor);
         return this;
     }
+
+    @Override
+    public ConsumerBuilder<T> 
negativeAckRedeliveryBackoff(NegativeAckRedeliveryBackoff 
negativeAckRedeliveryBackoff) {
+        checkArgument(negativeAckRedeliveryBackoff != null, 
"negativeAckRedeliveryBackoff must not be null.");
+        conf.setNegativeAckRedeliveryBackoff(negativeAckRedeliveryBackoff);
+        return this;
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 8245de6..79aad9c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -689,6 +689,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     }
 
     @Override
+    public void negativeAcknowledge(Message<?> message) {
+        negativeAcksTracker.add(message);
+
+        // Ensure the message is not redelivered for ack-timeout, since we did 
receive an "ack"
+        unAckedMessageTracker.remove(message.getMessageId());
+    }
+
+    @Override
     public void connectionOpened(final ClientCnx cnx) {
         previousExceptions.clear();
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 21ae2d7..f765e5e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -523,6 +523,16 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     }
 
     @Override
+    public void negativeAcknowledge(Message<?> message) {
+        MessageId messageId = message.getMessageId();
+        checkArgument(messageId instanceof TopicMessageIdImpl);
+        TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId;
+
+        ConsumerImpl<T> consumer = 
consumers.get(topicMessageId.getTopicPartitionName());
+        consumer.negativeAcknowledge(message);
+    }
+
+    @Override
     public CompletableFuture<Void> unsubscribeAsync() {
         if (getState() == State.Closing || getState() == State.Closed) {
             return FutureUtil.failedFuture(
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAckRedeliveryExponentialBackoff.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAckRedeliveryExponentialBackoff.java
new file mode 100644
index 0000000..9b0eb7a
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAckRedeliveryExponentialBackoff.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import com.google.common.base.Preconditions;
+import org.apache.pulsar.client.api.NegativeAckRedeliveryBackoff;
+
+/**
+ * NegativeAckRedeliveryExponentialBackoff
+ */
+public class NegativeAckRedeliveryExponentialBackoff implements 
NegativeAckRedeliveryBackoff {
+
+    private final long minNackTimeMs;
+    private final long maxNackTimeMs;
+    private final int maxBitShift;
+
+    private NegativeAckRedeliveryExponentialBackoff(long minNackTimeMs, long 
maxNackTimeMs) {
+        this.minNackTimeMs = minNackTimeMs;
+        this.maxNackTimeMs = maxNackTimeMs;
+
+        for (int i = 0; ; ) {
+            if (this.minNackTimeMs << ++i <= 0) {
+                this.maxBitShift = i;
+                break;
+            }
+        }
+    }
+
+    public static 
NegativeAckRedeliveryExponentialBackoff.NegativeAckRedeliveryExponentialBackoffBuilder
 builder() {
+        return new 
NegativeAckRedeliveryExponentialBackoff.NegativeAckRedeliveryExponentialBackoffBuilder();
+    }
+
+    public long getMinNackTimeMs() {
+        return this.minNackTimeMs;
+    }
+
+    public long getMaxNackTimeMs() {
+        return this.maxNackTimeMs;
+    }
+
+    @Override
+    public long next(int redeliveryCount) {
+        if (redeliveryCount <= 0 || minNackTimeMs <= 0) {
+            return this.minNackTimeMs;
+        }
+
+        if (this.maxBitShift <= redeliveryCount) {
+            return this.maxNackTimeMs;
+        }
+
+        return Math.min(this.minNackTimeMs << redeliveryCount, 
this.maxNackTimeMs);
+    }
+
+    /**
+     * Builder of NegativeAckRedeliveryExponentialBackoff.
+     */
+    public static class NegativeAckRedeliveryExponentialBackoffBuilder {
+        private long minNackTimeMs = 1000 * 10;
+        private long maxNackTimeMs = 1000 * 60 * 10;
+
+        public NegativeAckRedeliveryExponentialBackoffBuilder 
minNackTimeMs(long minNackTimeMs) {
+            this.minNackTimeMs = minNackTimeMs;
+            return this;
+        }
+
+        public NegativeAckRedeliveryExponentialBackoffBuilder 
maxNackTimeMs(long maxNackTimeMs) {
+            this.maxNackTimeMs = maxNackTimeMs;
+            return this;
+        }
+
+        public NegativeAckRedeliveryExponentialBackoff build() {
+            Preconditions.checkArgument(minNackTimeMs >= 0, "min nack time 
must be >= 0");
+            Preconditions.checkArgument(maxNackTimeMs >= minNackTimeMs,
+                    "max nack time must be >= minNackTimeMs");
+            return new NegativeAckRedeliveryExponentialBackoff(minNackTimeMs, 
maxNackTimeMs);
+        }
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
index a062009..5accee4 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
@@ -27,7 +27,9 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.NegativeAckRedeliveryBackoff;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import static 
org.apache.pulsar.client.impl.UnAckedMessageTracker.addChunkedMessageIdsAndRemoveFromSequenceMap;
 
@@ -39,6 +41,7 @@ class NegativeAcksTracker implements Closeable {
     private final Timer timer;
     private final long nackDelayNanos;
     private final long timerIntervalNanos;
+    private final NegativeAckRedeliveryBackoff negativeAckRedeliveryBackoff;
 
     private Timeout timeout;
 
@@ -50,7 +53,14 @@ class NegativeAcksTracker implements Closeable {
         this.timer = consumer.getClient().timer();
         this.nackDelayNanos = 
Math.max(TimeUnit.MICROSECONDS.toNanos(conf.getNegativeAckRedeliveryDelayMicros()),
                 MIN_NACK_DELAY_NANOS);
-        this.timerIntervalNanos = nackDelayNanos / 3;
+        this.negativeAckRedeliveryBackoff = 
conf.getNegativeAckRedeliveryBackoff();
+        if (negativeAckRedeliveryBackoff != null) {
+            this.timerIntervalNanos = Math.max(
+                    
TimeUnit.MILLISECONDS.toNanos(negativeAckRedeliveryBackoff.next(0)),
+                    MIN_NACK_DELAY_NANOS) / 3;
+        } else {
+            this.timerIntervalNanos = nackDelayNanos / 3;
+        }
     }
 
     private synchronized void triggerRedelivery(Timeout t) {
@@ -95,6 +105,40 @@ class NegativeAcksTracker implements Closeable {
         }
     }
 
+    public synchronized void add(Message<?> message) {
+        if (negativeAckRedeliveryBackoff == null) {
+            add(message.getMessageId());
+            return;
+        }
+        add(message.getMessageId(), message.getRedeliveryCount());
+    }
+
+    private synchronized void add(MessageId messageId, int redeliveryCount) {
+        if (messageId instanceof TopicMessageIdImpl) {
+            TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId;
+            messageId = topicMessageId.getInnerMessageId();
+        }
+
+        if (messageId instanceof BatchMessageIdImpl) {
+            BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
+            messageId = new MessageIdImpl(batchMessageId.getLedgerId(), 
batchMessageId.getEntryId(),
+                    batchMessageId.getPartitionIndex());
+        }
+
+        if (nackedMessages == null) {
+            nackedMessages = new HashMap<>();
+        }
+
+        long backoffNs = 
TimeUnit.MILLISECONDS.toNanos(negativeAckRedeliveryBackoff.next(redeliveryCount));
+        nackedMessages.put(messageId, System.nanoTime() + backoffNs);
+
+        if (this.timeout == null) {
+            // Schedule a task and group all the redeliveries for same period. 
Leave a small buffer to allow for
+            // nack immediately following the current one will be batched into 
the same redeliver request.
+            this.timeout = timer.newTimeout(this::triggerRedelivery, 
timerIntervalNanos, TimeUnit.NANOSECONDS);
+        }
+    }
+
     @Override
     public synchronized void close() {
         if (timeout != null && !timeout.isCancelled()) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 1076946..9603992 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.client.api.KeySharedPolicy;
 import org.apache.pulsar.client.api.MessageCrypto;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.MessagePayloadProcessor;
+import org.apache.pulsar.client.api.NegativeAckRedeliveryBackoff;
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionMode;
@@ -70,6 +71,9 @@ public class ConsumerConfigurationData<T> implements 
Serializable, Cloneable {
     @JsonIgnore
     private ConsumerEventListener consumerEventListener;
 
+    @JsonIgnore
+    private NegativeAckRedeliveryBackoff negativeAckRedeliveryBackoff;
+
     private int receiverQueueSize = 1000;
 
     private long acknowledgementsGroupTimeMicros = 
TimeUnit.MILLISECONDS.toMicros(100);
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/api/NegativeAckRedeliveryBackoffTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/api/NegativeAckRedeliveryBackoffTest.java
new file mode 100644
index 0000000..206a6f2
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/api/NegativeAckRedeliveryBackoffTest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+
+import org.apache.pulsar.client.impl.NegativeAckRedeliveryExponentialBackoff;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test of {@link NegativeAckRedeliveryBackoff}.
+ */
+public class NegativeAckRedeliveryBackoffTest {
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void testNext() {
+
+        long minNackTime = 1000;
+        long maxNackTime = 1000 * 60 * 10;
+
+        NegativeAckRedeliveryBackoff nackBackoff = spy(
+                NegativeAckRedeliveryExponentialBackoff.builder()
+                        .minNackTimeMs(minNackTime)
+                        .maxNackTimeMs(maxNackTime)
+                        .build());
+
+        assertEquals(nackBackoff.next(-1), minNackTime);
+
+        assertEquals(nackBackoff.next(0), minNackTime);
+
+        assertEquals(nackBackoff.next(1),minNackTime * 2);
+
+        assertEquals(nackBackoff.next(4), minNackTime * 16);
+
+        assertEquals(nackBackoff.next(100), maxNackTime);
+    }
+}
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
index 13d63ba..798069c 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
@@ -302,4 +302,12 @@ public class ConsumerBuilderImplTest {
         consumerBuilderImpl.subscriptionMode(SubscriptionMode.NonDurable)
             .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
     }
+
+    @Test
+    public void testNegativeAckRedeliveryBackoff() {
+        
consumerBuilderImpl.negativeAckRedeliveryBackoff(NegativeAckRedeliveryExponentialBackoff.builder()
+                .minNackTimeMs(1000)
+                .maxNackTimeMs(10 * 1000)
+                .build());
+    }
 }

Reply via email to