This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 04d0323  [pulsar-broker] redelivery tracker use open hashmap (#3585)
04d0323 is described below

commit 04d03233ca64c73188d09c577d2abe8032a9d2cd
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Fri Feb 22 14:13:40 2019 -0800

    [pulsar-broker] redelivery tracker use open hashmap (#3585)
    
    * [pulsar-broker] redelivery tracker use open hashmap
    
    * fix redel count
---
 .../broker/service/InMemoryRedeliveryTracker.java  | 25 ++++++++++++++--------
 .../pulsar/broker/service/RedeliveryTracker.java   |  4 ++--
 .../broker/service/RedeliveryTrackerDisabled.java  |  4 ++--
 3 files changed, 20 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java
index 99b38cc..b4e3508 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java
@@ -18,30 +18,37 @@
  */
 package org.apache.pulsar.broker.service;
 
-import org.apache.bookkeeper.mledger.Position;
-
 import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
+import 
org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
 
 public class InMemoryRedeliveryTracker implements RedeliveryTracker {
 
-    private ConcurrentHashMap<Position, AtomicInteger> trackerCache = new 
ConcurrentHashMap<>(16);
+    private ConcurrentLongLongPairHashMap trackerCache = new 
ConcurrentLongLongPairHashMap(256, 1);
 
     @Override
     public int incrementAndGetRedeliveryCount(Position position) {
-        trackerCache.putIfAbsent(position, new AtomicInteger(0));
-        return trackerCache.get(position).incrementAndGet();
+        PositionImpl positionImpl = (PositionImpl) position;
+        LongPair count = trackerCache.get(positionImpl.getLedgerId(), 
positionImpl.getEntryId());
+        int newCount = (int) (count != null ? count.first + 1 : 1);
+        trackerCache.put(positionImpl.getLedgerId(), 
positionImpl.getEntryId(), newCount, 0L);
+        return newCount;
     }
 
     @Override
     public int getRedeliveryCount(Position position) {
-        return trackerCache.getOrDefault(position, new AtomicInteger(0)).get();
+        PositionImpl positionImpl = (PositionImpl) position;
+        LongPair count = trackerCache.get(positionImpl.getLedgerId(), 
positionImpl.getEntryId());
+        return (int) (count!=null ? count.first : 0);
     }
 
     @Override
     public void remove(Position position) {
-        trackerCache.remove(position);
+        PositionImpl positionImpl = (PositionImpl) position;
+        trackerCache.remove(positionImpl.getLedgerId(), 
positionImpl.getEntryId());
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTracker.java
index 0f2e54a..79ea510 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTracker.java
@@ -18,10 +18,10 @@
  */
 package org.apache.pulsar.broker.service;
 
-import org.apache.bookkeeper.mledger.Position;
-
 import java.util.List;
 
+import org.apache.bookkeeper.mledger.Position;
+
 public interface RedeliveryTracker {
 
     int incrementAndGetRedeliveryCount(Position position);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTrackerDisabled.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTrackerDisabled.java
index 521417f..a930cd7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTrackerDisabled.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTrackerDisabled.java
@@ -18,10 +18,10 @@
  */
 package org.apache.pulsar.broker.service;
 
-import org.apache.bookkeeper.mledger.Position;
-
 import java.util.List;
 
+import org.apache.bookkeeper.mledger.Position;
+
 public class RedeliveryTrackerDisabled implements RedeliveryTracker {
 
     public static final RedeliveryTrackerDisabled REDELIVERY_TRACKER_DISABLED 
= new RedeliveryTrackerDisabled();

Reply via email to