This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d377bc9d732 [improve][client] PIP-393: Improve performance of Negative 
Acknowledgement (#23600)
d377bc9d732 is described below

commit d377bc9d7321a66201a301b6887fb1fea3ef8820
Author: Wenzhi Feng <[email protected]>
AuthorDate: Fri Jan 3 01:58:55 2025 +0800

    [improve][client] PIP-393: Improve performance of Negative Acknowledgement 
(#23600)
    
    Co-authored-by: Lari Hotari <[email protected]>
---
 distribution/shell/src/assemble/LICENSE.bin.txt    |   2 +
 pom.xml                                            |   2 +
 .../pulsar/client/impl/NegativeAcksTest.java       |  50 +++++++-
 pulsar-client-admin-shaded/pom.xml                 |  26 ++++
 pulsar-client-all/pom.xml                          |  26 ++++
 pulsar-client-dependencies-minimized/pom.xml       | 100 +++++++++++++++
 pulsar-client-shaded/pom.xml                       |  26 ++++
 pulsar-client/pom.xml                              |  10 ++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |   4 +-
 .../pulsar/client/impl/NegativeAcksTracker.java    | 139 ++++++++++++++-------
 .../impl/conf/ConsumerConfigurationData.java       |  10 ++
 11 files changed, 344 insertions(+), 51 deletions(-)

diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt 
b/distribution/shell/src/assemble/LICENSE.bin.txt
index 05342d17243..3333c9fe6ab 100644
--- a/distribution/shell/src/assemble/LICENSE.bin.txt
+++ b/distribution/shell/src/assemble/LICENSE.bin.txt
@@ -418,6 +418,8 @@ The Apache Software License, Version 2.0
     - avro-protobuf-1.11.4.jar
  * RE2j -- re2j-1.7.jar
  * Spotify completable-futures -- completable-futures-0.3.6.jar
+ * RoaringBitmap -- RoaringBitmap-1.2.0.jar
+ * Fastutil -- fastutil-8.5.14.jar
 
 BSD 3-clause "New" or "Revised" License
  * JSR305 -- jsr305-3.0.2.jar -- ../licenses/LICENSE-JSR305.txt
diff --git a/pom.xml b/pom.xml
index 93cd3d5e11f..3cd9bd4b8d4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2580,6 +2580,7 @@ flexible messaging model and an intuitive client 
API.</description>
         <module>pulsar-metadata</module>
         <module>jetcd-core-shaded</module>
         <module>jclouds-shaded</module>
+        <module>pulsar-client-dependencies-minimized</module>
 
         <!-- package management releated modules (begin) -->
         <module>pulsar-package-management</module>
@@ -2645,6 +2646,7 @@ flexible messaging model and an intuitive client 
API.</description>
         <module>distribution</module>
         <module>pulsar-metadata</module>
         <module>jetcd-core-shaded</module>
+        <module>pulsar-client-dependencies-minimized</module>
         <!-- package management releated modules (begin) -->
         <module>pulsar-package-management</module>
         <!-- package management releated modules (end) -->
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index b372ecabc5d..f8bc30f0966 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.impl;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import java.util.HashSet;
@@ -311,7 +312,7 @@ public class NegativeAcksTest extends ProducerConsumerBase {
         // negative topic message id
         consumer.negativeAcknowledge(topicMessageId);
         NegativeAcksTracker negativeAcksTracker = 
consumer.getNegativeAcksTracker();
-        
assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse((long) 
-1).longValue(), 1L);
+        assertEquals(negativeAcksTracker.getNackedMessagesCount(), 1L);
         assertEquals(unAckedMessageTracker.size(), 0);
         negativeAcksTracker.close();
         // negative batch message id
@@ -319,11 +320,56 @@ public class NegativeAcksTest extends 
ProducerConsumerBase {
         consumer.negativeAcknowledge(batchMessageId);
         consumer.negativeAcknowledge(batchMessageId2);
         consumer.negativeAcknowledge(batchMessageId3);
-        
assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse((long) 
-1).longValue(), 1L);
+        assertEquals(negativeAcksTracker.getNackedMessagesCount(), 1L);
         assertEquals(unAckedMessageTracker.size(), 0);
         negativeAcksTracker.close();
     }
 
+    /**
+     * If we nack multiple messages in the same batch with different 
redelivery delays, the messages should be redelivered
+     * with the correct delay. However, all messages are redelivered at the 
same time.
+     * @throws Exception
+     */
+    @Test
+    public void testNegativeAcksWithBatch() throws Exception {
+        cleanup();
+        conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
+        setup();
+        String topic = 
BrokerTestUtil.newUniqueName("testNegativeAcksWithBatch");
+
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("sub1")
+                .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
+                .subscriptionType(SubscriptionType.Shared)
+                .enableBatchIndexAcknowledgment(true)
+                .negativeAckRedeliveryDelay(3, TimeUnit.SECONDS)
+                .subscribe();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .enableBatching(true)
+                .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+                .batchingMaxMessages(2)
+                .create();
+        // send two messages in the same batch
+        producer.sendAsync("test-0");
+        producer.sendAsync("test-1");
+        producer.flush();
+
+        // negative ack the first message
+        consumer.negativeAcknowledge(consumer.receive());
+        // wait for 2s, negative ack the second message
+        Thread.sleep(2000);
+        consumer.negativeAcknowledge(consumer.receive());
+
+        // now 2s has passed, the first message should be redelivered 1s later.
+        Message<String> msg1 = consumer.receive(2, TimeUnit.SECONDS);
+        assertNotNull(msg1);
+    }
+
     @Test
     public void testNegativeAcksWithBatchAckEnabled() throws Exception {
         cleanup();
diff --git a/pulsar-client-admin-shaded/pom.xml 
b/pulsar-client-admin-shaded/pom.xml
index f667a8eb61e..de54c3d0496 100644
--- a/pulsar-client-admin-shaded/pom.xml
+++ b/pulsar-client-admin-shaded/pom.xml
@@ -34,6 +34,17 @@
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-client-admin-original</artifactId>
       <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>it.unimi.dsi</groupId>
+          <artifactId>fastutil</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-client-dependencies-minimized</artifactId>
+      <version>${project.version}</version>
     </dependency>
     <dependency>
       <groupId>${project.groupId}</groupId>
@@ -150,6 +161,8 @@
                   <include>org.objenesis:*</include>
                   <include>org.reactivestreams:reactive-streams</include>
                   <include>org.yaml:snakeyaml</include>
+                  
<include>org.apache.pulsar:pulsar-client-dependencies-minimized</include>
+                  <include>org.roaringbitmap:RoaringBitmap</include>
                 </includes>
                 <excludes>
                   
<exclude>com.fasterxml.jackson.core:jackson-annotations</exclude>
@@ -269,6 +282,10 @@
                   <pattern>io.swagger</pattern>
                   
<shadedPattern>org.apache.pulsar.shade.io.swagger</shadedPattern>
                 </relocation>
+                <relocation>
+                  <pattern>it.unimi.dsi.fastutil</pattern>
+                  
<shadedPattern>org.apache.pulsar.shade.it.unimi.dsi.fastutil</shadedPattern>
+                </relocation>
                 <relocation>
                   <pattern>javassist</pattern>
                   
<shadedPattern>org.apache.pulsar.shade.javassist</shadedPattern>
@@ -313,6 +330,11 @@
                   
<shadedPattern>META-INF/versions/$1/org/apache/pulsar/shade/org/glassfish/</shadedPattern>
                   <rawString>true</rawString>
                 </relocation>
+                <relocation>
+                  <pattern>META-INF/versions/(\d+)/org/roaringbitmap/</pattern>
+                  
<shadedPattern>META-INF/versions/$1/org/apache/pulsar/shade/org/roaringbitmap/</shadedPattern>
+                  <rawString>true</rawString>
+                </relocation>
                 <relocation>
                   <pattern>META-INF/versions/(\d+)/org/yaml/</pattern>
                   
<shadedPattern>META-INF/versions/$1/org/apache/pulsar/shade/org/yaml/</shadedPattern>
@@ -374,6 +396,10 @@
                   <pattern>org.reactivestreams</pattern>
                   
<shadedPattern>org.apache.pulsar.shade.org.reactivestreams</shadedPattern>
                 </relocation>
+                <relocation>
+                  <pattern>org.roaringbitmap</pattern>
+                  
<shadedPattern>org.apache.pulsar.shade.org.roaringbitmap</shadedPattern>
+                </relocation>
                 <relocation>
                   <pattern>org.yaml</pattern>
                   
<shadedPattern>org.apache.pulsar.shade.org.yaml</shadedPattern>
diff --git a/pulsar-client-all/pom.xml b/pulsar-client-all/pom.xml
index 5e30dbd999d..4fec9ff51b8 100644
--- a/pulsar-client-all/pom.xml
+++ b/pulsar-client-all/pom.xml
@@ -39,6 +39,17 @@
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-client-original</artifactId>
       <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>it.unimi.dsi</groupId>
+          <artifactId>fastutil</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-client-dependencies-minimized</artifactId>
+      <version>${project.version}</version>
     </dependency>
     <dependency>
       <groupId>${project.groupId}</groupId>
@@ -200,6 +211,8 @@
                   <include>org.reactivestreams:reactive-streams</include>
                   <include>org.tukaani:xz</include>
                   <include>org.yaml:snakeyaml</include>
+                  
<include>org.apache.pulsar:pulsar-client-dependencies-minimized</include>
+                  <include>org.roaringbitmap:RoaringBitmap</include>
                 </includes>
                 <excludes>
                   
<exclude>com.fasterxml.jackson.core:jackson-annotations</exclude>
@@ -317,6 +330,10 @@
                   <pattern>io.swagger</pattern>
                   
<shadedPattern>org.apache.pulsar.shade.io.swagger</shadedPattern>
                 </relocation>
+                <relocation>
+                  <pattern>it.unimi.dsi.fastutil</pattern>
+                  
<shadedPattern>org.apache.pulsar.shade.it.unimi.dsi.fastutil</shadedPattern>
+                </relocation>
                 <relocation>
                   <pattern>javassist</pattern>
                   
<shadedPattern>org.apache.pulsar.shade.javassist</shadedPattern>
@@ -361,6 +378,11 @@
                   
<shadedPattern>META-INF/versions/$1/org/apache/pulsar/shade/org/glassfish/</shadedPattern>
                   <rawString>true</rawString>
                 </relocation>
+                <relocation>
+                  <pattern>META-INF/versions/(\d+)/org/roaringbitmap/</pattern>
+                  
<shadedPattern>META-INF/versions/$1/org/apache/pulsar/shade/org/roaringbitmap/</shadedPattern>
+                  <rawString>true</rawString>
+                </relocation>
                 <relocation>
                   <pattern>META-INF/versions/(\d+)/org/yaml/</pattern>
                   
<shadedPattern>META-INF/versions/$1/org/apache/pulsar/shade/org/yaml/</shadedPattern>
@@ -439,6 +461,10 @@
                   <pattern>org.reactivestreams</pattern>
                   
<shadedPattern>org.apache.pulsar.shade.org.reactivestreams</shadedPattern>
                 </relocation>
+                <relocation>
+                  <pattern>org.roaringbitmap</pattern>
+                  
<shadedPattern>org.apache.pulsar.shade.org.roaringbitmap</shadedPattern>
+                </relocation>
                 <relocation>
                   <pattern>org.tukaani</pattern>
                   
<shadedPattern>org.apache.pulsar.shade.org.tukaani</shadedPattern>
diff --git a/pulsar-client-dependencies-minimized/pom.xml 
b/pulsar-client-dependencies-minimized/pom.xml
new file mode 100644
index 00000000000..e838fedfddc
--- /dev/null
+++ b/pulsar-client-dependencies-minimized/pom.xml
@@ -0,0 +1,100 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project
+        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";
+        xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar</artifactId>
+    <version>4.1.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>pulsar-client-dependencies-minimized</artifactId>
+  <name>Apache Pulsar :: Client :: Dependencies minimized</name>
+  <description>This module is used in `pulsar-client-all`, 
`pulsar-client-shaded`, and `pulsar-client-admin-shaded`
+    to minimize the number of classes included in the shaded jars for specific 
dependencies.
+    Currently, it is used to minimize the classes included from `fastutil`.
+  </description>
+  <dependencies>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-client-original</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+  </dependencies>
+  <build>
+    <finalName>${project.artifactId}-${project.version}</finalName>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-deploy-plugin</artifactId>
+        <!-- Skips the deployment of the minimized dependencies to Maven 
Central as this is an intermediate
+ module used for building the shaded client jars -->
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <createDependencyReducedPom>true</createDependencyReducedPom>
+              
<promoteTransitiveDependencies>false</promoteTransitiveDependencies>
+              <!-- minimize the classes included in the shaded jar -->
+              <minimizeJar>true</minimizeJar>
+              <artifactSet>
+                <includes>
+                  <!-- The Pulsar module that references the library being 
minimized -->
+                  <include>org.apache.pulsar:pulsar-client-original</include>
+                  <!-- Currently, only fastutil is minimized -->
+                  <include>it.unimi.dsi:fastutil</include>
+                </includes>
+              </artifactSet>
+              <filters>
+                <!--
+                This filter specifies the classes that use the dependencies.
+                Both includes and excludes are set to **.
+                -->
+                <filter>
+                  <artifact>org.apache.pulsar:pulsar-client-original</artifact>
+                  <includes>
+                    <include>**</include>
+                  </includes>
+                  <excludes>
+                    <exclude>**</exclude>
+                  </excludes>
+                </filter>
+              </filters>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/pulsar-client-shaded/pom.xml b/pulsar-client-shaded/pom.xml
index 62bab3cb2d7..d8adacbe8a0 100644
--- a/pulsar-client-shaded/pom.xml
+++ b/pulsar-client-shaded/pom.xml
@@ -39,6 +39,17 @@
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-client-original</artifactId>
       <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>it.unimi.dsi</groupId>
+          <artifactId>fastutil</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-client-dependencies-minimized</artifactId>
+      <version>${project.version}</version>
     </dependency>
     <dependency>
       <groupId>${project.groupId}</groupId>
@@ -164,6 +175,8 @@
                   <include>org.reactivestreams:reactive-streams</include>
                   <include>org.tukaani:xz</include>
                   <include>org.yaml:snakeyaml</include>
+                  
<include>org.apache.pulsar:pulsar-client-dependencies-minimized</include>
+                  <include>org.roaringbitmap:RoaringBitmap</include>
                 </includes>
                 <excludes>
                   
<exclude>com.fasterxml.jackson.core:jackson-annotations</exclude>
@@ -263,6 +276,10 @@
                   <pattern>io.swagger</pattern>
                   
<shadedPattern>org.apache.pulsar.shade.io.swagger</shadedPattern>
                 </relocation>
+                <relocation>
+                  <pattern>it.unimi.dsi.fastutil</pattern>
+                  
<shadedPattern>org.apache.pulsar.shade.it.unimi.dsi.fastutil</shadedPattern>
+                </relocation>
                 <relocation>
                   <pattern>javax.activation</pattern>
                   
<shadedPattern>org.apache.pulsar.shade.javax.activation</shadedPattern>
@@ -281,6 +298,11 @@
                   </shadedPattern>
                   <rawString>true</rawString>
                 </relocation>
+                <relocation>
+                  <pattern>META-INF/versions/(\d+)/org/roaringbitmap/</pattern>
+                  
<shadedPattern>META-INF/versions/$1/org/apache/pulsar/shade/org/roaringbitmap/</shadedPattern>
+                  <rawString>true</rawString>
+                </relocation>
                 <relocation>
                   <pattern>META-INF/versions/(\d+)/org/yaml/</pattern>
                   
<shadedPattern>META-INF/versions/$1/org/apache/pulsar/shade/org/yaml/</shadedPattern>
@@ -343,6 +365,10 @@
                   <pattern>org.reactivestreams</pattern>
                   
<shadedPattern>org.apache.pulsar.shade.org.reactivestreams</shadedPattern>
                 </relocation>
+                <relocation>
+                  <pattern>org.roaringbitmap</pattern>
+                  
<shadedPattern>org.apache.pulsar.shade.org.roaringbitmap</shadedPattern>
+                </relocation>
                 <relocation>
                   <pattern>org.tukaani</pattern>
                   
<shadedPattern>org.apache.pulsar.shade.org.tukaani</shadedPattern>
diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml
index 49bb3c6490a..e1a70ed0748 100644
--- a/pulsar-client/pom.xml
+++ b/pulsar-client/pom.xml
@@ -207,6 +207,16 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.roaringbitmap</groupId>
+      <artifactId>RoaringBitmap</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>it.unimi.dsi</groupId>
+      <artifactId>fastutil</artifactId>
+    </dependency>
+
   </dependencies>
 
   <build>
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 16dc70f736e..86af4bdaf58 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -2752,7 +2752,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         int messagesFromQueue = 0;
         Message<T> peek = incomingMessages.peek();
         if (peek != null) {
-            MessageIdAdv messageId = 
MessageIdAdvUtils.discardBatch(peek.getMessageId());
+            MessageId messageId = 
NegativeAcksTracker.discardBatchAndPartitionIndex(peek.getMessageId());
             if (!messageIds.contains(messageId)) {
                 // first message is not expired, then no message is expired in 
queue.
                 return 0;
@@ -2763,7 +2763,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             while (message != null) {
                 decreaseIncomingMessageSize(message);
                 messagesFromQueue++;
-                MessageIdAdv id = 
MessageIdAdvUtils.discardBatch(message.getMessageId());
+                MessageId id = 
NegativeAcksTracker.discardBatchAndPartitionIndex(message.getMessageId());
                 if (!messageIds.contains(id)) {
                     messageIds.add(id);
                     break;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
index 5256ebf04f4..273880569c3 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
@@ -22,9 +22,13 @@ import static 
org.apache.pulsar.client.impl.UnAckedMessageTracker.addChunkedMess
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.Timeout;
 import io.netty.util.Timer;
+import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap;
+import it.unimi.dsi.fastutil.longs.LongBidirectionalIterator;
 import java.io.Closeable;
 import java.util.HashSet;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.client.api.Message;
@@ -32,40 +36,37 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.client.api.RedeliveryBackoff;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
-import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
+import org.roaringbitmap.longlong.Roaring64Bitmap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 class NegativeAcksTracker implements Closeable {
     private static final Logger log = 
LoggerFactory.getLogger(NegativeAcksTracker.class);
 
-    private ConcurrentLongLongPairHashMap nackedMessages = null;
+    // timestamp -> ledgerId -> entryId, no need to batch index, if different 
messages have
+    // different timestamp, there will be multiple entries in the map
+    // RB Tree -> LongOpenHashMap -> Roaring64Bitmap
+    private Long2ObjectSortedMap<Long2ObjectMap<Roaring64Bitmap>> 
nackedMessages = null;
 
     private final ConsumerBase<?> consumer;
     private final Timer timer;
-    private final long nackDelayNanos;
-    private final long timerIntervalNanos;
+    private final long nackDelayMs;
     private final RedeliveryBackoff negativeAckRedeliveryBackoff;
+    private final int negativeAckPrecisionBitCnt;
 
     private Timeout timeout;
 
     // Set a min delay to allow for grouping nacks within a single batch
-    private static final long MIN_NACK_DELAY_NANOS = 
TimeUnit.MILLISECONDS.toNanos(100);
-    private static final long NON_PARTITIONED_TOPIC_PARTITION_INDEX = 
Long.MAX_VALUE;
+    private static final long MIN_NACK_DELAY_MS = 100;
+    private static final int DUMMY_PARTITION_INDEX = -2;
 
     public NegativeAcksTracker(ConsumerBase<?> consumer, 
ConsumerConfigurationData<?> conf) {
         this.consumer = consumer;
         this.timer = consumer.getClient().timer();
-        this.nackDelayNanos = 
Math.max(TimeUnit.MICROSECONDS.toNanos(conf.getNegativeAckRedeliveryDelayMicros()),
-                MIN_NACK_DELAY_NANOS);
+        this.nackDelayMs = 
Math.max(TimeUnit.MICROSECONDS.toMillis(conf.getNegativeAckRedeliveryDelayMicros()),
+                MIN_NACK_DELAY_MS);
         this.negativeAckRedeliveryBackoff = 
conf.getNegativeAckRedeliveryBackoff();
-        if (negativeAckRedeliveryBackoff != null) {
-            this.timerIntervalNanos = Math.max(
-                    
TimeUnit.MILLISECONDS.toNanos(negativeAckRedeliveryBackoff.next(0)),
-                    MIN_NACK_DELAY_NANOS) / 3;
-        } else {
-            this.timerIntervalNanos = nackDelayNanos / 3;
-        }
+        this.negativeAckPrecisionBitCnt = conf.getNegativeAckPrecisionBitCnt();
     }
 
     private void triggerRedelivery(Timeout t) {
@@ -76,21 +77,48 @@ class NegativeAcksTracker implements Closeable {
                 return;
             }
 
-            long now = System.nanoTime();
-            nackedMessages.forEach((ledgerId, entryId, partitionIndex, 
timestamp) -> {
-                if (timestamp < now) {
-                    MessageId msgId = new MessageIdImpl(ledgerId, entryId,
-                            // need to covert non-partitioned topic partition 
index to -1
-                            (int) (partitionIndex == 
NON_PARTITIONED_TOPIC_PARTITION_INDEX ? -1 : partitionIndex));
-                    addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, 
messagesToRedeliver, this.consumer);
-                    messagesToRedeliver.add(msgId);
+            long currentTimestamp = System.currentTimeMillis();
+            for (long timestamp : nackedMessages.keySet()) {
+                if (timestamp > currentTimestamp) {
+                    // We are done with all the messages that need to be 
redelivered
+                    break;
+                }
+
+                Long2ObjectMap<Roaring64Bitmap> ledgerMap = 
nackedMessages.get(timestamp);
+                for (Long2ObjectMap.Entry<Roaring64Bitmap> ledgerEntry : 
ledgerMap.long2ObjectEntrySet()) {
+                    long ledgerId = ledgerEntry.getLongKey();
+                    Roaring64Bitmap entrySet = ledgerEntry.getValue();
+                    entrySet.forEach(entryId -> {
+                        MessageId msgId = new MessageIdImpl(ledgerId, entryId, 
DUMMY_PARTITION_INDEX);
+                        addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, 
messagesToRedeliver, this.consumer);
+                        messagesToRedeliver.add(msgId);
+                    });
+                }
+            }
+
+            // remove entries from the nackedMessages map
+            LongBidirectionalIterator iterator = 
nackedMessages.keySet().iterator();
+            while (iterator.hasNext()) {
+                long timestamp = iterator.nextLong();
+                if (timestamp <= currentTimestamp) {
+                    iterator.remove();
+                } else {
+                    break;
                 }
-            });
-            for (MessageId messageId : messagesToRedeliver) {
-                nackedMessages.remove(((MessageIdImpl) 
messageId).getLedgerId(),
-                        ((MessageIdImpl) messageId).getEntryId());
             }
-            this.timeout = timer.newTimeout(this::triggerRedelivery, 
timerIntervalNanos, TimeUnit.NANOSECONDS);
+
+            // Schedule the next redelivery if there are still messages to 
redeliver
+            if (!nackedMessages.isEmpty()) {
+                long nextTriggerTimestamp = nackedMessages.firstLongKey();
+                long delayMs = Math.max(nextTriggerTimestamp - 
currentTimestamp, 0);
+                if (delayMs > 0) {
+                    this.timeout = timer.newTimeout(this::triggerRedelivery, 
delayMs, TimeUnit.MILLISECONDS);
+                } else {
+                    this.timeout = timer.newTimeout(this::triggerRedelivery, 
0, TimeUnit.MILLISECONDS);
+                }
+            } else {
+                this.timeout = null;
+            }
         }
 
         // release the lock of NegativeAcksTracker before calling 
consumer.redeliverUnacknowledgedMessages,
@@ -110,39 +138,56 @@ class NegativeAcksTracker implements Closeable {
         add(message.getMessageId(), message.getRedeliveryCount());
     }
 
+    static long trimLowerBit(long timestamp, int bits) {
+        return timestamp & (-1L << bits);
+    }
+
     private synchronized void add(MessageId messageId, int redeliveryCount) {
         if (nackedMessages == null) {
-            nackedMessages = ConcurrentLongLongPairHashMap.newBuilder()
-                    .autoShrink(true)
-                    .concurrencyLevel(1)
-                    .build();
+            nackedMessages = new Long2ObjectAVLTreeMap<>();
         }
 
-        long backoffNs;
+        long backoffMs;
         if (negativeAckRedeliveryBackoff != null) {
-            backoffNs = 
TimeUnit.MILLISECONDS.toNanos(negativeAckRedeliveryBackoff.next(redeliveryCount));
+            backoffMs = 
TimeUnit.MILLISECONDS.toMillis(negativeAckRedeliveryBackoff.next(redeliveryCount));
         } else {
-            backoffNs = nackDelayNanos;
+            backoffMs = nackDelayMs;
         }
-        MessageIdAdv messageIdAdv = MessageIdAdvUtils.discardBatch(messageId);
-        // ConcurrentLongLongPairHashMap requires the key and value >=0.
-        // partitionIndex is -1 if the message is from a non-partitioned 
topic, but we don't use
-        // partitionIndex actually, so we can set it to Long.MAX_VALUE in the 
case of non-partitioned topic to
-        // avoid exception from ConcurrentLongLongPairHashMap.
-        nackedMessages.put(messageIdAdv.getLedgerId(), 
messageIdAdv.getEntryId(),
-                messageIdAdv.getPartitionIndex() >= 0 ? 
messageIdAdv.getPartitionIndex() :
-                        NON_PARTITIONED_TOPIC_PARTITION_INDEX, 
System.nanoTime() + backoffNs);
+        MessageIdAdv messageIdAdv = (MessageIdAdv) messageId;
+        long timestamp = trimLowerBit(System.currentTimeMillis() + backoffMs, 
negativeAckPrecisionBitCnt);
+        nackedMessages.computeIfAbsent(timestamp, k -> new 
Long2ObjectOpenHashMap<>())
+                .computeIfAbsent(messageIdAdv.getLedgerId(), k -> new 
Roaring64Bitmap())
+                .add(messageIdAdv.getEntryId());
 
         if (this.timeout == null) {
             // Schedule a task and group all the redeliveries for same period. 
Leave a small buffer to allow for
             // nack immediately following the current one will be batched into 
the same redeliver request.
-            this.timeout = timer.newTimeout(this::triggerRedelivery, 
timerIntervalNanos, TimeUnit.NANOSECONDS);
+            this.timeout = timer.newTimeout(this::triggerRedelivery, 
backoffMs, TimeUnit.MILLISECONDS);
         }
     }
 
+    /**
+     * Discard the batch index and partition index from the message id.
+     *
+     * @param messageId
+     * @return
+     */
+    public static MessageIdAdv discardBatchAndPartitionIndex(MessageId 
messageId) {
+        if (messageId instanceof ChunkMessageIdImpl) {
+            return (MessageIdAdv) messageId;
+        }
+        MessageIdAdv msgId = (MessageIdAdv) messageId;
+        return new MessageIdImpl(msgId.getLedgerId(), msgId.getEntryId(), 
DUMMY_PARTITION_INDEX);
+    }
+
     @VisibleForTesting
-    Optional<Long> getNackedMessagesCount() {
-        return 
Optional.ofNullable(nackedMessages).map(ConcurrentLongLongPairHashMap::size);
+    synchronized long getNackedMessagesCount() {
+        if (nackedMessages == null) {
+            return 0;
+        }
+        return nackedMessages.values().stream().mapToLong(
+                ledgerMap -> ledgerMap.values().stream().mapToLong(
+                        Roaring64Bitmap::getLongCardinality).sum()).sum();
     }
 
     @Override
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index cd82b54618f..dc9251a975c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -156,6 +156,16 @@ public class ConsumerConfigurationData<T> implements 
Serializable, Cloneable {
     )
     private long negativeAckRedeliveryDelayMicros = 
TimeUnit.MINUTES.toMicros(1);
 
+    @ApiModelProperty(
+            name = "negativeAckPrecisionBitCnt",
+            value = "The redelivery time precision bit count. The lower bits 
of the redelivery time will be"
+                    + "trimmed to reduce the memory occupation.\nThe default 
value is 8, which means the"
+                    + "redelivery time will be bucketed by 256ms, the 
redelivery time could be earlier(no later)"
+                    + "than the expected time, but no more than 256ms. \nIf 
set to k, the redelivery time will be"
+                    + "bucketed by 2^k ms.\nIf the value is 0, the redelivery 
time will be accurate to ms."
+    )
+    private int negativeAckPrecisionBitCnt = 8;
+
     @ApiModelProperty(
             name = "maxTotalReceiverQueueSizeAcrossPartitions",
             value = "The max total receiver queue size across partitions.\n"

Reply via email to