This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 31b95ec08a94f6ecf04a6a5d64394b1996191d89
Author: Oneby Wang <[email protected]>
AuthorDate: Wed Feb 4 18:46:37 2026 +0800

    [fix][client] Fix race condition between isDuplicate() and flushAsync() 
method in PersistentAcknowledgmentsGroupingTracker due to incorrect use Netty 
Recycler (#25208)
    
    (cherry picked from commit 5aab2f00f37c56303cb4efb86b3728d055eed7b1)
---
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 11 +++--
 .../PersistentAcknowledgmentsGroupingTracker.java  | 25 +++++------
 .../impl/AcknowledgementsGroupingTrackerTest.java  | 50 ++++++++++++++++++++++
 .../apache/pulsar/common/protocol/Commands.java    | 11 +++--
 .../collections/ConcurrentBitSetRecyclable.java    |  1 +
 5 files changed, 74 insertions(+), 24 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 6d5305136b1..8c6385cf806 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -136,7 +136,7 @@ import org.apache.pulsar.common.util.ExceptionHandler;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.SafeCollectionUtils;
 import org.apache.pulsar.common.util.collections.BitSetRecyclable;
-import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
+import org.apache.pulsar.common.util.collections.ConcurrentBitSet;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -3184,7 +3184,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             } else {
                 if (Commands.peerSupportsMultiMessageAcknowledgment(
                         getClientCnx().getRemoteEndpointProtocolVersion())) {
-                    List<Triple<Long, Long, ConcurrentBitSetRecyclable>> 
entriesToAck =
+                    List<Triple<Long, Long, ConcurrentBitSet>> entriesToAck =
                             new ArrayList<>(chunkMsgIds.length);
                     for (MessageIdImpl cMsgId : chunkMsgIds) {
                         if (cMsgId != null && chunkMsgIds.length > 1) {
@@ -3222,7 +3222,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     }
 
     private ByteBuf newMultiTransactionMessageAck(long consumerId, TxnID txnID,
-                                                  List<Triple<Long, Long, 
ConcurrentBitSetRecyclable>> entries,
+                                                  List<Triple<Long, Long, 
ConcurrentBitSet>> entries,
                                                   long requestID) {
         BaseCommand cmd = newMultiMessageAckCommon(entries);
         cmd.getAck()
@@ -3241,7 +3241,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         }
     };
 
-    private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, 
Long, ConcurrentBitSetRecyclable>> entries) {
+    private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, 
Long, ConcurrentBitSet>> entries) {
         BaseCommand cmd = LOCAL_BASE_COMMAND.get()
                 .clear()
                 .setType(BaseCommand.Type.ACK);
@@ -3250,7 +3250,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         for (int i = 0; i < entriesCount; i++) {
             long ledgerId = entries.get(i).getLeft();
             long entryId = entries.get(i).getMiddle();
-            ConcurrentBitSetRecyclable bitSet = entries.get(i).getRight();
+            ConcurrentBitSet bitSet = entries.get(i).getRight();
             MessageIdData msgId = ack.addMessageId()
                     .setLedgerId(ledgerId)
                     .setEntryId(entryId);
@@ -3259,7 +3259,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 for (int j = 0; j < ackSet.length; j++) {
                     msgId.addAckSet(ackSet[j]);
                 }
-                bitSet.recycle();
             }
         }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index b814d261fd7..0598dc4fb36 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -51,7 +51,7 @@ import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.BitSetRecyclable;
-import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
+import org.apache.pulsar.common.util.collections.ConcurrentBitSet;
 import org.jspecify.annotations.Nullable;
 
 /**
@@ -83,7 +83,7 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
      */
     private final ConcurrentSkipListSet<MessageIdAdv> pendingIndividualAcks;
     @VisibleForTesting
-    final ConcurrentSkipListMap<MessageIdAdv, ConcurrentBitSetRecyclable> 
pendingIndividualBatchIndexAcks;
+    final ConcurrentSkipListMap<MessageIdAdv, ConcurrentBitSet> 
pendingIndividualBatchIndexAcks;
 
     private final ScheduledFuture<?> scheduledTask;
     private final boolean batchIndexAckEnabled;
@@ -133,7 +133,7 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
                 return true;
             }
             if (messageIdAdv.getBatchIndex() >= 0) {
-                ConcurrentBitSetRecyclable bitSet = 
pendingIndividualBatchIndexAcks.get(key);
+                ConcurrentBitSet bitSet = 
pendingIndividualBatchIndexAcks.get(key);
                 return bitSet != null && 
!bitSet.get(messageIdAdv.getBatchIndex());
             }
             return false;
@@ -327,21 +327,22 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
 
     @VisibleForTesting
     CompletableFuture<Void> doIndividualBatchAckAsync(MessageIdAdv msgId) {
-        ConcurrentBitSetRecyclable bitSet = 
pendingIndividualBatchIndexAcks.computeIfAbsent(
+        ConcurrentBitSet bitSet = 
pendingIndividualBatchIndexAcks.computeIfAbsent(
                 MessageIdAdvUtils.discardBatch(msgId), __ -> {
                     final BitSet ackSet = msgId.getAckSet();
-                    final ConcurrentBitSetRecyclable value;
+                    final ConcurrentBitSet value;
                     if (ackSet != null) {
                         synchronized (ackSet) {
                             if (!ackSet.isEmpty()) {
-                                value = 
ConcurrentBitSetRecyclable.create(ackSet);
+                                value =  new ConcurrentBitSet();
+                                value.or(ackSet);
                             } else {
-                                value = ConcurrentBitSetRecyclable.create();
+                                value = new ConcurrentBitSet();
                                 value.set(0, msgId.getBatchSize());
                             }
                         }
                     } else {
-                        value = ConcurrentBitSetRecyclable.create();
+                        value = new ConcurrentBitSet();
                         value.set(0, msgId.getBatchSize());
                     }
                     return value;
@@ -445,7 +446,7 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
         }
 
         // Flush all individual acks
-        List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck =
+        List<Triple<Long, Long, ConcurrentBitSet>> entriesToAck =
                 new ArrayList<>(pendingIndividualAcks.size() + 
pendingIndividualBatchIndexAcks.size());
         if (!pendingIndividualAcks.isEmpty()) {
             if 
(Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion()))
 {
@@ -487,7 +488,7 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
         }
 
         while (true) {
-            Map.Entry<MessageIdAdv, ConcurrentBitSetRecyclable> entry =
+            Map.Entry<MessageIdAdv, ConcurrentBitSet> entry =
                     pendingIndividualBatchIndexAcks.pollFirstEntry();
             if (entry == null) {
                 // The entry has been removed in a different thread
@@ -539,7 +540,7 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
         // cumulative ack chunk by the last messageId
         if (chunkMsgIds != null &&  ackType != AckType.Cumulative) {
             if 
(Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion()))
 {
-                List<Triple<Long, Long, ConcurrentBitSetRecyclable>> 
entriesToAck = new ArrayList<>(chunkMsgIds.length);
+                List<Triple<Long, Long, ConcurrentBitSet>> entriesToAck = new 
ArrayList<>(chunkMsgIds.length);
                 for (MessageIdImpl cMsgId : chunkMsgIds) {
                     if (cMsgId != null && chunkMsgIds.length > 1) {
                         entriesToAck.add(Triple.of(cMsgId.getLedgerId(), 
cMsgId.getEntryId(), null));
@@ -568,7 +569,7 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
             long entryId, BitSetRecyclable ackSet, AckType ackType,
             Map<String, Long> properties, boolean flush,
             TimedCompletableFuture<Void> timedCompletableFuture,
-            List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck) 
{
+            List<Triple<Long, Long, ConcurrentBitSet>> entriesToAck) {
         if (consumer.isAckReceiptEnabled()) {
             final long requestId = consumer.getClient().newRequestId();
             final ByteBuf cmd;
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
index 7a8222473a3..1f46f9da3a4 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
@@ -31,8 +31,10 @@ import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
+import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
@@ -405,6 +407,54 @@ public class AcknowledgementsGroupingTrackerTest {
         tracker.close();
     }
 
+    @Test
+    public void testDoIndividualBatchAckNeverAffectIsDuplicate() throws 
Exception {
+        ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>();
+        conf.setMaxAcknowledgmentGroupSize(1);
+        PersistentAcknowledgmentsGroupingTracker tracker =
+                new PersistentAcknowledgmentsGroupingTracker(consumer, conf, 
eventLoopGroup);
+
+        BatchMessageIdImpl batchMessageId0 = new BatchMessageIdImpl(5, 1, 0, 
0, 10, null);
+        BatchMessageIdImpl batchMessageId1 = new BatchMessageIdImpl(5, 1, 0, 
1, 10, null);
+
+        int loops = 10000;
+        int addAcknowledgmentThreadCount = 10;
+        List<Thread> addAcknowledgmentThreads = new 
ArrayList<>(addAcknowledgmentThreadCount);
+        for (int i = 0; i < addAcknowledgmentThreadCount; i++) {
+            Thread addAcknowledgmentThread = new Thread(() -> {
+                for (int j = 0; j < loops; j++) {
+                    tracker.addAcknowledgment(batchMessageId0, 
AckType.Individual, Collections.emptyMap());
+                }
+            }, "doIndividualBatchAck-thread-" + i);
+            addAcknowledgmentThread.start();
+            addAcknowledgmentThreads.add(addAcknowledgmentThread);
+        }
+
+        int isDuplicateThreadCount = 10;
+        AtomicBoolean assertResult = new AtomicBoolean();
+        List<Thread> isDuplicateThreads = new 
ArrayList<>(isDuplicateThreadCount);
+        for (int i = 0; i < isDuplicateThreadCount; i++) {
+            Thread isDuplicateThread = new Thread(() -> {
+                for (int j = 0; j < loops; j++) {
+                    boolean duplicate = tracker.isDuplicate(batchMessageId1);
+                    assertResult.set(assertResult.get() || duplicate);
+                }
+            }, "isDuplicate-thread-" + i);
+            isDuplicateThread.start();
+            isDuplicateThreads.add(isDuplicateThread);
+        }
+
+        for (Thread addAcknowledgmentThread : addAcknowledgmentThreads) {
+            addAcknowledgmentThread.join();
+        }
+
+        for (Thread isDuplicateThread : isDuplicateThreads) {
+            isDuplicateThread.join();
+        }
+
+        assertFalse(assertResult.get());
+    }
+
     public class ClientCnxTest extends ClientCnx {
 
         public ClientCnxTest(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup) {
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index a6796387f91..deca71e9361 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -112,7 +112,7 @@ import 
org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.collections.BitSetRecyclable;
-import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
+import org.apache.pulsar.common.util.collections.ConcurrentBitSet;
 
 @UtilityClass
 @Slf4j
@@ -1035,7 +1035,7 @@ public class Commands {
     }
 
     public static ByteBuf newMultiTransactionMessageAck(long consumerId, TxnID 
txnID,
-            List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entries) {
+            List<Triple<Long, Long, ConcurrentBitSet>> entries) {
         BaseCommand cmd = newMultiMessageAckCommon(entries);
         cmd.getAck()
                 .setConsumerId(consumerId)
@@ -1045,14 +1045,14 @@ public class Commands {
         return serializeWithSize(cmd);
     }
 
-    private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, 
Long, ConcurrentBitSetRecyclable>> entries) {
+    private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, 
Long, ConcurrentBitSet>> entries) {
         BaseCommand cmd = localCmd(Type.ACK);
         CommandAck ack = cmd.setAck();
         int entriesCount = entries.size();
         for (int i = 0; i < entriesCount; i++) {
             long ledgerId = entries.get(i).getLeft();
             long entryId = entries.get(i).getMiddle();
-            ConcurrentBitSetRecyclable bitSet = entries.get(i).getRight();
+            ConcurrentBitSet bitSet = entries.get(i).getRight();
             MessageIdData msgId = ack.addMessageId()
                     .setLedgerId(ledgerId)
                     .setEntryId(entryId);
@@ -1061,7 +1061,6 @@ public class Commands {
                 for (int j = 0; j < ackSet.length; j++) {
                     msgId.addAckSet(ackSet[j]);
                 }
-                bitSet.recycle();
             }
         }
 
@@ -1069,7 +1068,7 @@ public class Commands {
     }
 
     public static ByteBuf newMultiMessageAck(long consumerId,
-                                             List<Triple<Long, Long, 
ConcurrentBitSetRecyclable>> entries,
+                                             List<Triple<Long, Long, 
ConcurrentBitSet>> entries,
                                              long requestId) {
         BaseCommand cmd = newMultiMessageAckCommon(entries);
         cmd.getAck()
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java
index 0ba409b2d7d..d29e4b8240f 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java
@@ -27,6 +27,7 @@ import java.util.BitSet;
 /**
  * Safe multithreaded version of {@code BitSet} and leverage netty recycler.
  */
+@Deprecated
 @EqualsAndHashCode(callSuper = true)
 public class ConcurrentBitSetRecyclable extends ConcurrentBitSet {
 

Reply via email to