This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 14543d3fde9 [fix][client] Avoid recycling the same
ConcurrentBitSetRecyclable among different threads (#24725)
14543d3fde9 is described below
commit 14543d3fde935d1b70e3707a8b2c0294eb53dccb
Author: Yunze Xu <[email protected]>
AuthorDate: Thu Sep 11 22:19:35 2025 +0800
[fix][client] Avoid recycling the same ConcurrentBitSetRecyclable among
different threads (#24725)
---
.../PersistentAcknowledgmentsGroupingTracker.java | 29 +++++++++++-----------
.../impl/AcknowledgementsGroupingTrackerTest.java | 28 ++++++---------------
2 files changed, 22 insertions(+), 35 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index 5f7957d7f1d..b814d261fd7 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;
import static
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
+import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.FastThreadLocal;
@@ -26,13 +27,12 @@ import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -82,7 +82,8 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
* broker.
*/
private final ConcurrentSkipListSet<MessageIdAdv> pendingIndividualAcks;
- private final ConcurrentHashMap<MessageIdAdv, ConcurrentBitSetRecyclable>
pendingIndividualBatchIndexAcks;
+ @VisibleForTesting
+ final ConcurrentSkipListMap<MessageIdAdv, ConcurrentBitSetRecyclable>
pendingIndividualBatchIndexAcks;
private final ScheduledFuture<?> scheduledTask;
private final boolean batchIndexAckEnabled;
@@ -92,7 +93,7 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
EventLoopGroup
eventLoopGroup) {
this.consumer = consumer;
this.pendingIndividualAcks = new ConcurrentSkipListSet<>();
- this.pendingIndividualBatchIndexAcks = new ConcurrentHashMap<>();
+ this.pendingIndividualBatchIndexAcks = new ConcurrentSkipListMap<>();
this.acknowledgementGroupTimeMicros =
conf.getAcknowledgementsGroupTimeMicros();
this.maxAckGroupSize = conf.getMaxAcknowledgmentGroupSize();
this.batchIndexAckEnabled = conf.isBatchIndexAckEnabled();
@@ -324,7 +325,8 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
}
}
- private CompletableFuture<Void> doIndividualBatchAckAsync(MessageIdAdv
msgId) {
+ @VisibleForTesting
+ CompletableFuture<Void> doIndividualBatchAckAsync(MessageIdAdv msgId) {
ConcurrentBitSetRecyclable bitSet =
pendingIndividualBatchIndexAcks.computeIfAbsent(
MessageIdAdvUtils.discardBatch(msgId), __ -> {
final BitSet ackSet = msgId.getAckSet();
@@ -484,16 +486,15 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
}
}
- if (!pendingIndividualBatchIndexAcks.isEmpty()) {
- Iterator<Map.Entry<MessageIdAdv, ConcurrentBitSetRecyclable>>
iterator =
- pendingIndividualBatchIndexAcks.entrySet().iterator();
-
- while (iterator.hasNext()) {
- Map.Entry<MessageIdAdv, ConcurrentBitSetRecyclable> entry =
iterator.next();
- entriesToAck.add(Triple.of(
- entry.getKey().getLedgerId(),
entry.getKey().getEntryId(), entry.getValue()));
- iterator.remove();
+ while (true) {
+ Map.Entry<MessageIdAdv, ConcurrentBitSetRecyclable> entry =
+ pendingIndividualBatchIndexAcks.pollFirstEntry();
+ if (entry == null) {
+ // The entry has been removed in a different thread
+ break;
}
+ entriesToAck.add(Triple.of(
+ entry.getKey().getLedgerId(), entry.getKey().getEntryId(),
entry.getValue()));
}
if (entriesToAck.size() > 0) {
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
index bbf12654899..7a8222473a3 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
@@ -31,23 +31,18 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
import java.util.BitSet;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.client.util.TimedCompletableFuture;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
-import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
@@ -387,27 +382,18 @@ public class AcknowledgementsGroupingTrackerTest {
}
@Test
- public void testDoIndividualBatchAckAsync() throws Exception{
+ public void testDoIndividualBatchAckAsync() {
ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>();
- AcknowledgmentsGroupingTracker tracker =
- new PersistentAcknowledgmentsGroupingTracker(consumer, conf,
eventLoopGroup);
- MessageId messageId1 = new BatchMessageIdImpl(5, 1, 0, 3, 10, null);
+ var tracker = new PersistentAcknowledgmentsGroupingTracker(consumer,
conf, eventLoopGroup);
+ var messageId1 = new BatchMessageIdImpl(5, 1, 0, 3, 10, null);
BitSet bitSet = new BitSet(20);
for (int i = 0; i < 20; i++) {
bitSet.set(i, true);
}
- MessageId messageId2 = new BatchMessageIdImpl(3, 2, 0, 5, 20, bitSet);
- Method doIndividualBatchAckAsync =
PersistentAcknowledgmentsGroupingTracker.class
- .getDeclaredMethod("doIndividualBatchAckAsync",
MessageIdAdv.class);
- doIndividualBatchAckAsync.setAccessible(true);
- doIndividualBatchAckAsync.invoke(tracker, messageId1);
- doIndividualBatchAckAsync.invoke(tracker, messageId2);
- Field pendingIndividualBatchIndexAcks =
-
PersistentAcknowledgmentsGroupingTracker.class.getDeclaredField("pendingIndividualBatchIndexAcks");
- pendingIndividualBatchIndexAcks.setAccessible(true);
- ConcurrentHashMap<MessageIdAdv, ConcurrentBitSetRecyclable>
batchIndexAcks =
- (ConcurrentHashMap<MessageIdAdv, ConcurrentBitSetRecyclable>)
pendingIndividualBatchIndexAcks
- .get(tracker);
+ var messageId2 = new BatchMessageIdImpl(3, 2, 0, 5, 20, bitSet);
+ tracker.doIndividualBatchAckAsync(messageId1);
+ tracker.doIndividualBatchAckAsync(messageId2);
+ var batchIndexAcks = tracker.pendingIndividualBatchIndexAcks;
MessageIdImpl position1 = new MessageIdImpl(5, 1, 0);
MessageIdImpl position2 = new MessageIdImpl(3, 2, 0);
assertTrue(batchIndexAcks.containsKey(position1));