This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5aab2f00f37 [fix][client] Fix race condition between isDuplicate() and
flushAsync() method in PersistentAcknowledgmentsGroupingTracker due to
incorrect use Netty Recycler (#25208)
5aab2f00f37 is described below
commit 5aab2f00f37c56303cb4efb86b3728d055eed7b1
Author: Oneby Wang <[email protected]>
AuthorDate: Wed Feb 4 18:46:37 2026 +0800
[fix][client] Fix race condition between isDuplicate() and flushAsync()
method in PersistentAcknowledgmentsGroupingTracker due to incorrect use Netty
Recycler (#25208)
---
.../apache/pulsar/client/impl/ConsumerImpl.java | 11 +++--
.../PersistentAcknowledgmentsGroupingTracker.java | 25 +++++------
.../impl/AcknowledgementsGroupingTrackerTest.java | 50 ++++++++++++++++++++++
.../apache/pulsar/common/protocol/Commands.java | 11 +++--
.../collections/ConcurrentBitSetRecyclable.java | 1 +
5 files changed, 74 insertions(+), 24 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 37e9f16fe02..adecd97564f 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -136,7 +136,7 @@ import org.apache.pulsar.common.util.ExceptionHandler;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.SafeCollectionUtils;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
-import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
+import org.apache.pulsar.common.util.collections.ConcurrentBitSet;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -3187,7 +3187,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
} else {
if (Commands.peerSupportsMultiMessageAcknowledgment(
getClientCnx().getRemoteEndpointProtocolVersion())) {
- List<Triple<Long, Long, ConcurrentBitSetRecyclable>>
entriesToAck =
+ List<Triple<Long, Long, ConcurrentBitSet>> entriesToAck =
new ArrayList<>(chunkMsgIds.length);
for (MessageIdImpl cMsgId : chunkMsgIds) {
if (cMsgId != null && chunkMsgIds.length > 1) {
@@ -3225,7 +3225,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
private ByteBuf newMultiTransactionMessageAck(long consumerId, TxnID txnID,
- List<Triple<Long, Long,
ConcurrentBitSetRecyclable>> entries,
+ List<Triple<Long, Long,
ConcurrentBitSet>> entries,
long requestID) {
BaseCommand cmd = newMultiMessageAckCommon(entries);
cmd.getAck()
@@ -3244,7 +3244,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
};
- private static BaseCommand newMultiMessageAckCommon(List<Triple<Long,
Long, ConcurrentBitSetRecyclable>> entries) {
+ private static BaseCommand newMultiMessageAckCommon(List<Triple<Long,
Long, ConcurrentBitSet>> entries) {
BaseCommand cmd = LOCAL_BASE_COMMAND.get()
.clear()
.setType(BaseCommand.Type.ACK);
@@ -3253,7 +3253,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
for (int i = 0; i < entriesCount; i++) {
long ledgerId = entries.get(i).getLeft();
long entryId = entries.get(i).getMiddle();
- ConcurrentBitSetRecyclable bitSet = entries.get(i).getRight();
+ ConcurrentBitSet bitSet = entries.get(i).getRight();
MessageIdData msgId = ack.addMessageId()
.setLedgerId(ledgerId)
.setEntryId(entryId);
@@ -3262,7 +3262,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
for (int j = 0; j < ackSet.length; j++) {
msgId.addAckSet(ackSet[j]);
}
- bitSet.recycle();
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index b814d261fd7..0598dc4fb36 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -51,7 +51,7 @@ import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
-import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
+import org.apache.pulsar.common.util.collections.ConcurrentBitSet;
import org.jspecify.annotations.Nullable;
/**
@@ -83,7 +83,7 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
*/
private final ConcurrentSkipListSet<MessageIdAdv> pendingIndividualAcks;
@VisibleForTesting
- final ConcurrentSkipListMap<MessageIdAdv, ConcurrentBitSetRecyclable>
pendingIndividualBatchIndexAcks;
+ final ConcurrentSkipListMap<MessageIdAdv, ConcurrentBitSet>
pendingIndividualBatchIndexAcks;
private final ScheduledFuture<?> scheduledTask;
private final boolean batchIndexAckEnabled;
@@ -133,7 +133,7 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
return true;
}
if (messageIdAdv.getBatchIndex() >= 0) {
- ConcurrentBitSetRecyclable bitSet =
pendingIndividualBatchIndexAcks.get(key);
+ ConcurrentBitSet bitSet =
pendingIndividualBatchIndexAcks.get(key);
return bitSet != null &&
!bitSet.get(messageIdAdv.getBatchIndex());
}
return false;
@@ -327,21 +327,22 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
@VisibleForTesting
CompletableFuture<Void> doIndividualBatchAckAsync(MessageIdAdv msgId) {
- ConcurrentBitSetRecyclable bitSet =
pendingIndividualBatchIndexAcks.computeIfAbsent(
+ ConcurrentBitSet bitSet =
pendingIndividualBatchIndexAcks.computeIfAbsent(
MessageIdAdvUtils.discardBatch(msgId), __ -> {
final BitSet ackSet = msgId.getAckSet();
- final ConcurrentBitSetRecyclable value;
+ final ConcurrentBitSet value;
if (ackSet != null) {
synchronized (ackSet) {
if (!ackSet.isEmpty()) {
- value =
ConcurrentBitSetRecyclable.create(ackSet);
+ value = new ConcurrentBitSet();
+ value.or(ackSet);
} else {
- value = ConcurrentBitSetRecyclable.create();
+ value = new ConcurrentBitSet();
value.set(0, msgId.getBatchSize());
}
}
} else {
- value = ConcurrentBitSetRecyclable.create();
+ value = new ConcurrentBitSet();
value.set(0, msgId.getBatchSize());
}
return value;
@@ -445,7 +446,7 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
}
// Flush all individual acks
- List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck =
+ List<Triple<Long, Long, ConcurrentBitSet>> entriesToAck =
new ArrayList<>(pendingIndividualAcks.size() +
pendingIndividualBatchIndexAcks.size());
if (!pendingIndividualAcks.isEmpty()) {
if
(Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion()))
{
@@ -487,7 +488,7 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
}
while (true) {
- Map.Entry<MessageIdAdv, ConcurrentBitSetRecyclable> entry =
+ Map.Entry<MessageIdAdv, ConcurrentBitSet> entry =
pendingIndividualBatchIndexAcks.pollFirstEntry();
if (entry == null) {
// The entry has been removed in a different thread
@@ -539,7 +540,7 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
// cumulative ack chunk by the last messageId
if (chunkMsgIds != null && ackType != AckType.Cumulative) {
if
(Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion()))
{
- List<Triple<Long, Long, ConcurrentBitSetRecyclable>>
entriesToAck = new ArrayList<>(chunkMsgIds.length);
+ List<Triple<Long, Long, ConcurrentBitSet>> entriesToAck = new
ArrayList<>(chunkMsgIds.length);
for (MessageIdImpl cMsgId : chunkMsgIds) {
if (cMsgId != null && chunkMsgIds.length > 1) {
entriesToAck.add(Triple.of(cMsgId.getLedgerId(),
cMsgId.getEntryId(), null));
@@ -568,7 +569,7 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
long entryId, BitSetRecyclable ackSet, AckType ackType,
Map<String, Long> properties, boolean flush,
TimedCompletableFuture<Void> timedCompletableFuture,
- List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck)
{
+ List<Triple<Long, Long, ConcurrentBitSet>> entriesToAck) {
if (consumer.isAckReceiptEnabled()) {
final long requestId = consumer.getClient().newRequestId();
final ByteBuf cmd;
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
index 7a8222473a3..1f46f9da3a4 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
@@ -31,8 +31,10 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
+import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@@ -405,6 +407,54 @@ public class AcknowledgementsGroupingTrackerTest {
tracker.close();
}
+ @Test
+ public void testDoIndividualBatchAckNeverAffectIsDuplicate() throws
Exception {
+ ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>();
+ conf.setMaxAcknowledgmentGroupSize(1);
+ PersistentAcknowledgmentsGroupingTracker tracker =
+ new PersistentAcknowledgmentsGroupingTracker(consumer, conf,
eventLoopGroup);
+
+ BatchMessageIdImpl batchMessageId0 = new BatchMessageIdImpl(5, 1, 0,
0, 10, null);
+ BatchMessageIdImpl batchMessageId1 = new BatchMessageIdImpl(5, 1, 0,
1, 10, null);
+
+ int loops = 10000;
+ int addAcknowledgmentThreadCount = 10;
+ List<Thread> addAcknowledgmentThreads = new
ArrayList<>(addAcknowledgmentThreadCount);
+ for (int i = 0; i < addAcknowledgmentThreadCount; i++) {
+ Thread addAcknowledgmentThread = new Thread(() -> {
+ for (int j = 0; j < loops; j++) {
+ tracker.addAcknowledgment(batchMessageId0,
AckType.Individual, Collections.emptyMap());
+ }
+ }, "doIndividualBatchAck-thread-" + i);
+ addAcknowledgmentThread.start();
+ addAcknowledgmentThreads.add(addAcknowledgmentThread);
+ }
+
+ int isDuplicateThreadCount = 10;
+ AtomicBoolean assertResult = new AtomicBoolean();
+ List<Thread> isDuplicateThreads = new
ArrayList<>(isDuplicateThreadCount);
+ for (int i = 0; i < isDuplicateThreadCount; i++) {
+ Thread isDuplicateThread = new Thread(() -> {
+ for (int j = 0; j < loops; j++) {
+ boolean duplicate = tracker.isDuplicate(batchMessageId1);
+ assertResult.set(assertResult.get() || duplicate);
+ }
+ }, "isDuplicate-thread-" + i);
+ isDuplicateThread.start();
+ isDuplicateThreads.add(isDuplicateThread);
+ }
+
+ for (Thread addAcknowledgmentThread : addAcknowledgmentThreads) {
+ addAcknowledgmentThread.join();
+ }
+
+ for (Thread isDuplicateThread : isDuplicateThreads) {
+ isDuplicateThread.join();
+ }
+
+ assertFalse(assertResult.get());
+ }
+
public class ClientCnxTest extends ClientCnx {
public ClientCnxTest(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup) {
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index a6796387f91..deca71e9361 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -112,7 +112,7 @@ import
org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
-import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
+import org.apache.pulsar.common.util.collections.ConcurrentBitSet;
@UtilityClass
@Slf4j
@@ -1035,7 +1035,7 @@ public class Commands {
}
public static ByteBuf newMultiTransactionMessageAck(long consumerId, TxnID
txnID,
- List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entries) {
+ List<Triple<Long, Long, ConcurrentBitSet>> entries) {
BaseCommand cmd = newMultiMessageAckCommon(entries);
cmd.getAck()
.setConsumerId(consumerId)
@@ -1045,14 +1045,14 @@ public class Commands {
return serializeWithSize(cmd);
}
- private static BaseCommand newMultiMessageAckCommon(List<Triple<Long,
Long, ConcurrentBitSetRecyclable>> entries) {
+ private static BaseCommand newMultiMessageAckCommon(List<Triple<Long,
Long, ConcurrentBitSet>> entries) {
BaseCommand cmd = localCmd(Type.ACK);
CommandAck ack = cmd.setAck();
int entriesCount = entries.size();
for (int i = 0; i < entriesCount; i++) {
long ledgerId = entries.get(i).getLeft();
long entryId = entries.get(i).getMiddle();
- ConcurrentBitSetRecyclable bitSet = entries.get(i).getRight();
+ ConcurrentBitSet bitSet = entries.get(i).getRight();
MessageIdData msgId = ack.addMessageId()
.setLedgerId(ledgerId)
.setEntryId(entryId);
@@ -1061,7 +1061,6 @@ public class Commands {
for (int j = 0; j < ackSet.length; j++) {
msgId.addAckSet(ackSet[j]);
}
- bitSet.recycle();
}
}
@@ -1069,7 +1068,7 @@ public class Commands {
}
public static ByteBuf newMultiMessageAck(long consumerId,
- List<Triple<Long, Long,
ConcurrentBitSetRecyclable>> entries,
+ List<Triple<Long, Long,
ConcurrentBitSet>> entries,
long requestId) {
BaseCommand cmd = newMultiMessageAckCommon(entries);
cmd.getAck()
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java
index 0ba409b2d7d..d29e4b8240f 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java
@@ -27,6 +27,7 @@ import java.util.BitSet;
/**
* Safe multithreaded version of {@code BitSet} and leverage netty recycler.
*/
+@Deprecated
@EqualsAndHashCode(callSuper = true)
public class ConcurrentBitSetRecyclable extends ConcurrentBitSet {