This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 10aaedf8acbe58a669b56e9b4c574a652ede550b Author: Oneby Wang <[email protected]> AuthorDate: Wed Feb 4 18:46:37 2026 +0800 [fix][client] Fix race condition between isDuplicate() and flushAsync() method in PersistentAcknowledgmentsGroupingTracker due to incorrect use Netty Recycler (#25208) (cherry picked from commit 5aab2f00f37c56303cb4efb86b3728d055eed7b1) --- .../apache/pulsar/client/impl/ConsumerImpl.java | 11 +++-- .../PersistentAcknowledgmentsGroupingTracker.java | 25 +++++------ .../impl/AcknowledgementsGroupingTrackerTest.java | 50 ++++++++++++++++++++++ .../apache/pulsar/common/protocol/Commands.java | 11 +++-- .../collections/ConcurrentBitSetRecyclable.java | 1 + 5 files changed, 74 insertions(+), 24 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index fff1ba41837..5b745de480a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -126,7 +126,7 @@ import org.apache.pulsar.common.util.ExceptionHandler; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.SafeCollectionUtils; import org.apache.pulsar.common.util.collections.BitSetRecyclable; -import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable; +import org.apache.pulsar.common.util.collections.ConcurrentBitSet; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue; import org.slf4j.Logger; @@ -3008,7 +3008,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle } else { if (Commands.peerSupportsMultiMessageAcknowledgment( getClientCnx().getRemoteEndpointProtocolVersion())) { - List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck = + List<Triple<Long, Long, ConcurrentBitSet>> entriesToAck = new ArrayList<>(chunkMsgIds.length); for (MessageIdImpl cMsgId : chunkMsgIds) { if (cMsgId != null && chunkMsgIds.length > 1) { @@ -3046,7 +3046,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle } private ByteBuf newMultiTransactionMessageAck(long consumerId, TxnID txnID, - List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entries, + List<Triple<Long, Long, ConcurrentBitSet>> entries, long requestID) { BaseCommand cmd = newMultiMessageAckCommon(entries); cmd.getAck() @@ -3065,7 +3065,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle } }; - private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entries) { + private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, Long, ConcurrentBitSet>> entries) { BaseCommand cmd = LOCAL_BASE_COMMAND.get() .clear() .setType(BaseCommand.Type.ACK); @@ -3074,7 +3074,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle for (int i = 0; i < entriesCount; i++) { long ledgerId = entries.get(i).getLeft(); long entryId = entries.get(i).getMiddle(); - ConcurrentBitSetRecyclable bitSet = entries.get(i).getRight(); + ConcurrentBitSet bitSet = entries.get(i).getRight(); MessageIdData msgId = ack.addMessageId() .setLedgerId(ledgerId) .setEntryId(entryId); @@ -3083,7 +3083,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle for (int j = 0; j < ackSet.length; j++) { msgId.addAckSet(ackSet[j]); } - bitSet.recycle(); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index 5e462d9e25a..0bdda067f83 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -52,7 +52,7 @@ import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.BitSetRecyclable; -import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable; +import org.apache.pulsar.common.util.collections.ConcurrentBitSet; /** * Group the acknowledgements for a certain time and then sends them out in a single protobuf command. @@ -83,7 +83,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments */ private final ConcurrentSkipListSet<MessageIdAdv> pendingIndividualAcks; @VisibleForTesting - final ConcurrentSkipListMap<MessageIdAdv, ConcurrentBitSetRecyclable> pendingIndividualBatchIndexAcks; + final ConcurrentSkipListMap<MessageIdAdv, ConcurrentBitSet> pendingIndividualBatchIndexAcks; private final ScheduledFuture<?> scheduledTask; private final boolean batchIndexAckEnabled; @@ -133,7 +133,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments return true; } if (messageIdAdv.getBatchIndex() >= 0) { - ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.get(key); + ConcurrentBitSet bitSet = pendingIndividualBatchIndexAcks.get(key); return bitSet != null && !bitSet.get(messageIdAdv.getBatchIndex()); } return false; @@ -327,21 +327,22 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments @VisibleForTesting CompletableFuture<Void> doIndividualBatchAckAsync(MessageIdAdv msgId) { - ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent( + ConcurrentBitSet bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent( MessageIdAdvUtils.discardBatch(msgId), __ -> { final BitSet ackSet = msgId.getAckSet(); - final ConcurrentBitSetRecyclable value; + final ConcurrentBitSet value; if (ackSet != null) { synchronized (ackSet) { if (!ackSet.isEmpty()) { - value = ConcurrentBitSetRecyclable.create(ackSet); + value = new ConcurrentBitSet(); + value.or(ackSet); } else { - value = ConcurrentBitSetRecyclable.create(); + value = new ConcurrentBitSet(); value.set(0, msgId.getBatchSize()); } } } else { - value = ConcurrentBitSetRecyclable.create(); + value = new ConcurrentBitSet(); value.set(0, msgId.getBatchSize()); } return value; @@ -445,7 +446,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments } // Flush all individual acks - List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck = + List<Triple<Long, Long, ConcurrentBitSet>> entriesToAck = new ArrayList<>(pendingIndividualAcks.size() + pendingIndividualBatchIndexAcks.size()); if (!pendingIndividualAcks.isEmpty()) { if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) { @@ -487,7 +488,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments } while (true) { - Map.Entry<MessageIdAdv, ConcurrentBitSetRecyclable> entry = + Map.Entry<MessageIdAdv, ConcurrentBitSet> entry = pendingIndividualBatchIndexAcks.pollFirstEntry(); if (entry == null) { // The entry has been removed in a different thread @@ -539,7 +540,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments // cumulative ack chunk by the last messageId if (chunkMsgIds != null && ackType != AckType.Cumulative) { if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) { - List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck = new ArrayList<>(chunkMsgIds.length); + List<Triple<Long, Long, ConcurrentBitSet>> entriesToAck = new ArrayList<>(chunkMsgIds.length); for (MessageIdImpl cMsgId : chunkMsgIds) { if (cMsgId != null && chunkMsgIds.length > 1) { entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null)); @@ -568,7 +569,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments long entryId, BitSetRecyclable ackSet, AckType ackType, Map<String, Long> properties, boolean flush, TimedCompletableFuture<Void> timedCompletableFuture, - List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck) { + List<Triple<Long, Long, ConcurrentBitSet>> entriesToAck) { if (consumer.isAckReceiptEnabled()) { final long requestId = consumer.getClient().newRequestId(); final ByteBuf cmd; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java index ebcc99f8557..a3254b10949 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java @@ -31,8 +31,10 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; +import java.util.ArrayList; import java.util.BitSet; import java.util.Collections; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -407,6 +409,54 @@ public class AcknowledgementsGroupingTrackerTest { tracker.close(); } + @Test + public void testDoIndividualBatchAckNeverAffectIsDuplicate() throws Exception { + ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>(); + conf.setMaxAcknowledgmentGroupSize(1); + PersistentAcknowledgmentsGroupingTracker tracker = + new PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup); + + BatchMessageIdImpl batchMessageId0 = new BatchMessageIdImpl(5, 1, 0, 0, 10, null); + BatchMessageIdImpl batchMessageId1 = new BatchMessageIdImpl(5, 1, 0, 1, 10, null); + + int loops = 10000; + int addAcknowledgmentThreadCount = 10; + List<Thread> addAcknowledgmentThreads = new ArrayList<>(addAcknowledgmentThreadCount); + for (int i = 0; i < addAcknowledgmentThreadCount; i++) { + Thread addAcknowledgmentThread = new Thread(() -> { + for (int j = 0; j < loops; j++) { + tracker.addAcknowledgment(batchMessageId0, AckType.Individual, Collections.emptyMap()); + } + }, "doIndividualBatchAck-thread-" + i); + addAcknowledgmentThread.start(); + addAcknowledgmentThreads.add(addAcknowledgmentThread); + } + + int isDuplicateThreadCount = 10; + AtomicBoolean assertResult = new AtomicBoolean(); + List<Thread> isDuplicateThreads = new ArrayList<>(isDuplicateThreadCount); + for (int i = 0; i < isDuplicateThreadCount; i++) { + Thread isDuplicateThread = new Thread(() -> { + for (int j = 0; j < loops; j++) { + boolean duplicate = tracker.isDuplicate(batchMessageId1); + assertResult.set(assertResult.get() || duplicate); + } + }, "isDuplicate-thread-" + i); + isDuplicateThread.start(); + isDuplicateThreads.add(isDuplicateThread); + } + + for (Thread addAcknowledgmentThread : addAcknowledgmentThreads) { + addAcknowledgmentThread.join(); + } + + for (Thread isDuplicateThread : isDuplicateThreads) { + isDuplicateThread.join(); + } + + assertFalse(assertResult.get()); + } + public class ClientCnxTest extends ClientCnx { public ClientCnxTest(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index ac2d6d0a95d..fa52318d353 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -105,7 +105,7 @@ import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.collections.BitSetRecyclable; -import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable; +import org.apache.pulsar.common.util.collections.ConcurrentBitSet; @UtilityClass @Slf4j @@ -981,7 +981,7 @@ public class Commands { } public static ByteBuf newMultiTransactionMessageAck(long consumerId, TxnID txnID, - List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entries) { + List<Triple<Long, Long, ConcurrentBitSet>> entries) { BaseCommand cmd = newMultiMessageAckCommon(entries); cmd.getAck() .setConsumerId(consumerId) @@ -991,14 +991,14 @@ public class Commands { return serializeWithSize(cmd); } - private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entries) { + private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, Long, ConcurrentBitSet>> entries) { BaseCommand cmd = localCmd(Type.ACK); CommandAck ack = cmd.setAck(); int entriesCount = entries.size(); for (int i = 0; i < entriesCount; i++) { long ledgerId = entries.get(i).getLeft(); long entryId = entries.get(i).getMiddle(); - ConcurrentBitSetRecyclable bitSet = entries.get(i).getRight(); + ConcurrentBitSet bitSet = entries.get(i).getRight(); MessageIdData msgId = ack.addMessageId() .setLedgerId(ledgerId) .setEntryId(entryId); @@ -1007,7 +1007,6 @@ public class Commands { for (int j = 0; j < ackSet.length; j++) { msgId.addAckSet(ackSet[j]); } - bitSet.recycle(); } } @@ -1015,7 +1014,7 @@ public class Commands { } public static ByteBuf newMultiMessageAck(long consumerId, - List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entries, + List<Triple<Long, Long, ConcurrentBitSet>> entries, long requestId) { BaseCommand cmd = newMultiMessageAckCommon(entries); cmd.getAck() diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java index 0ba409b2d7d..d29e4b8240f 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java @@ -27,6 +27,7 @@ import java.util.BitSet; /** * Safe multithreaded version of {@code BitSet} and leverage netty recycler. */ +@Deprecated @EqualsAndHashCode(callSuper = true) public class ConcurrentBitSetRecyclable extends ConcurrentBitSet {
