This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 04d0323 [pulsar-broker] redelivery tracker use open hashmap (#3585)
04d0323 is described below
commit 04d03233ca64c73188d09c577d2abe8032a9d2cd
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Fri Feb 22 14:13:40 2019 -0800
[pulsar-broker] redelivery tracker use open hashmap (#3585)
* [pulsar-broker] redelivery tracker use open hashmap
* fix redel count
---
.../broker/service/InMemoryRedeliveryTracker.java | 25 ++++++++++++++--------
.../pulsar/broker/service/RedeliveryTracker.java | 4 ++--
.../broker/service/RedeliveryTrackerDisabled.java | 4 ++--
3 files changed, 20 insertions(+), 13 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java
index 99b38cc..b4e3508 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java
@@ -18,30 +18,37 @@
*/
package org.apache.pulsar.broker.service;
-import org.apache.bookkeeper.mledger.Position;
-
import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
+import
org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
public class InMemoryRedeliveryTracker implements RedeliveryTracker {
- private ConcurrentHashMap<Position, AtomicInteger> trackerCache = new
ConcurrentHashMap<>(16);
+ private ConcurrentLongLongPairHashMap trackerCache = new
ConcurrentLongLongPairHashMap(256, 1);
@Override
public int incrementAndGetRedeliveryCount(Position position) {
- trackerCache.putIfAbsent(position, new AtomicInteger(0));
- return trackerCache.get(position).incrementAndGet();
+ PositionImpl positionImpl = (PositionImpl) position;
+ LongPair count = trackerCache.get(positionImpl.getLedgerId(),
positionImpl.getEntryId());
+ int newCount = (int) (count != null ? count.first + 1 : 1);
+ trackerCache.put(positionImpl.getLedgerId(),
positionImpl.getEntryId(), newCount, 0L);
+ return newCount;
}
@Override
public int getRedeliveryCount(Position position) {
- return trackerCache.getOrDefault(position, new AtomicInteger(0)).get();
+ PositionImpl positionImpl = (PositionImpl) position;
+ LongPair count = trackerCache.get(positionImpl.getLedgerId(),
positionImpl.getEntryId());
+ return (int) (count!=null ? count.first : 0);
}
@Override
public void remove(Position position) {
- trackerCache.remove(position);
+ PositionImpl positionImpl = (PositionImpl) position;
+ trackerCache.remove(positionImpl.getLedgerId(),
positionImpl.getEntryId());
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTracker.java
index 0f2e54a..79ea510 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTracker.java
@@ -18,10 +18,10 @@
*/
package org.apache.pulsar.broker.service;
-import org.apache.bookkeeper.mledger.Position;
-
import java.util.List;
+import org.apache.bookkeeper.mledger.Position;
+
public interface RedeliveryTracker {
int incrementAndGetRedeliveryCount(Position position);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTrackerDisabled.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTrackerDisabled.java
index 521417f..a930cd7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTrackerDisabled.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTrackerDisabled.java
@@ -18,10 +18,10 @@
*/
package org.apache.pulsar.broker.service;
-import org.apache.bookkeeper.mledger.Position;
-
import java.util.List;
+import org.apache.bookkeeper.mledger.Position;
+
public class RedeliveryTrackerDisabled implements RedeliveryTracker {
public static final RedeliveryTrackerDisabled REDELIVERY_TRACKER_DISABLED
= new RedeliveryTrackerDisabled();