This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new ca8d33835db [fix][broker] Key_Shared subscription doesn't always 
deliver messages from the replay queue after a consumer disconnects and leaves 
a backlog (#24732)
ca8d33835db is described below

commit ca8d33835dba2b6200c68838af13a8cced41282b
Author: nborisov <[email protected]>
AuthorDate: Fri Sep 12 12:30:49 2025 +0300

    [fix][broker] Key_Shared subscription doesn't always deliver messages from 
the replay queue after a consumer disconnects and leaves a backlog (#24732)
    
    Co-authored-by: Nikolai Borisov <[email protected]>
---
 .../service/ConsumerHashAssignmentsSnapshot.java   |  44 ++++---
 .../broker/service/DrainingHashesTracker.java      |   8 ++
 .../broker/service/ImpactedConsumersResult.java    |  32 +++--
 ...movedHashRanges.java => UpdatedHashRanges.java} |   8 +-
 ...istentStickyKeyDispatcherMultipleConsumers.java |  17 ++-
 ...istentHashingStickyKeyConsumerSelectorTest.java |  39 ++++--
 .../ConsumerHashAssignmentsSnapshotTest.java       |  49 +++++---
 .../client/api/KeySharedSubscriptionTest.java      | 131 +++++++++++++++++++++
 8 files changed, 266 insertions(+), 62 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshot.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshot.java
index 18ea927c642..0607711826b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshot.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshot.java
@@ -72,7 +72,7 @@ public class ConsumerHashAssignmentsSnapshot {
     }
 
     public ImpactedConsumersResult 
resolveImpactedConsumers(ConsumerHashAssignmentsSnapshot assignmentsAfter) {
-        return resolveConsumerRemovedHashRanges(this.hashRangeAssignments, 
assignmentsAfter.hashRangeAssignments);
+        return resolveConsumerUpdatedHashRanges(this.hashRangeAssignments, 
assignmentsAfter.hashRangeAssignments);
     }
 
     /**
@@ -111,28 +111,36 @@ public class ConsumerHashAssignmentsSnapshot {
      * @param mappingAfter the range mapping after the change
      * @return consumers and ranges where the existing range changed
      */
-    static ImpactedConsumersResult 
resolveConsumerRemovedHashRanges(List<HashRangeAssignment> mappingBefore,
+    static ImpactedConsumersResult 
resolveConsumerUpdatedHashRanges(List<HashRangeAssignment> mappingBefore,
                                                                     
List<HashRangeAssignment> mappingAfter) {
         Map<Range, Pair<Consumer, Consumer>> impactedRanges = 
diffRanges(mappingBefore, mappingAfter);
-        Map<Consumer, SortedSet<Range>> removedRangesByConsumer = 
impactedRanges.entrySet().stream()
-                .collect(IdentityHashMap::new, (resultMap, entry) -> {
-                    Range range = entry.getKey();
-                    // filter out only where the range was removed
-                    Consumer consumerBefore = entry.getValue().getLeft();
-                    if (consumerBefore != null) {
-                        resultMap.computeIfAbsent(consumerBefore, k -> new 
TreeSet<>()).add(range);
-                    }
-                }, IdentityHashMap::putAll);
-        return 
mergedOverlappingRangesAndConvertToImpactedConsumersResult(removedRangesByConsumer);
+        Map<Consumer, SortedSet<Range>> addedRangesByConsumer = new 
IdentityHashMap<>();
+        Map<Consumer, SortedSet<Range>> removedRangesByConsumer = new 
IdentityHashMap<>();
+        impactedRanges.forEach((range, value) -> {
+            Consumer consumerAfter = value.getRight();
+
+            // last consumer was removed
+            if (consumerAfter != null) {
+                addedRangesByConsumer.computeIfAbsent(consumerAfter, k -> new 
TreeSet<>()).add(range);
+            }
+            // filter out only where the range was removed
+            Consumer consumerBefore = value.getLeft();
+            if (consumerBefore != null) {
+                removedRangesByConsumer.computeIfAbsent(consumerBefore, k -> 
new TreeSet<>()).add(range);
+            }
+        });
+        var removedMerged = 
mergedOverlappingRangesAndConvertToImpactedConsumersResult(removedRangesByConsumer);
+        var addedMerged = 
mergedOverlappingRangesAndConvertToImpactedConsumersResult(addedRangesByConsumer);
+        return ImpactedConsumersResult.of(removedMerged, addedMerged);
     }
 
-    static ImpactedConsumersResult 
mergedOverlappingRangesAndConvertToImpactedConsumersResult(
-            Map<Consumer, SortedSet<Range>> removedRangesByConsumer) {
-        Map<Consumer, RemovedHashRanges> mergedRangesByConsumer = new 
IdentityHashMap<>();
-        removedRangesByConsumer.forEach((consumer, ranges) -> {
-            mergedRangesByConsumer.put(consumer, 
RemovedHashRanges.of(mergeOverlappingRanges(ranges)));
+    static Map<Consumer, UpdatedHashRanges> 
mergedOverlappingRangesAndConvertToImpactedConsumersResult(
+            Map<Consumer, SortedSet<Range>> updatedRangesByConsumer) {
+        Map<Consumer, UpdatedHashRanges> mergedRangesByConsumer = new 
IdentityHashMap<>();
+        updatedRangesByConsumer.forEach((consumer, ranges) -> {
+            mergedRangesByConsumer.put(consumer, 
UpdatedHashRanges.of(mergeOverlappingRanges(ranges)));
         });
-        return ImpactedConsumersResult.of(mergedRangesByConsumer);
+        return mergedRangesByConsumer;
     }
 
     /**
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java
index 9bc5c5f1e44..51c45817368 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java
@@ -400,6 +400,14 @@ public class DrainingHashesTracker {
             } finally {
                 lock.writeLock().unlock();
             }
+
+            // update the consumer specific stats
+            ConsumerDrainingHashesStats drainingHashesStats =
+                    consumerDrainingHashesStatsMap.get(new 
ConsumerIdentityWrapper(consumer));
+            if (drainingHashesStats != null) {
+                drainingHashesStats.clearHash(stickyKeyHash);
+            }
+
             return false;
         }
         // increment the blocked count which is used to determine if the hash 
is blocking
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ImpactedConsumersResult.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ImpactedConsumersResult.java
index a525b0501d7..bb888db1ca3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ImpactedConsumersResult.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ImpactedConsumersResult.java
@@ -30,22 +30,27 @@ import lombok.ToString;
 @EqualsAndHashCode
 @ToString
 public class ImpactedConsumersResult {
-    public interface RemovedHashRangesProcessor {
-        void process(Consumer consumer, RemovedHashRanges removedHashRanges);
+    public interface UpdatedHashRangesProcessor {
+        void process(Consumer consumer, UpdatedHashRanges updatedHashRanges, 
OperationType operationType);
     }
 
-    private final Map<Consumer, RemovedHashRanges> removedHashRanges;
+    private final Map<Consumer, UpdatedHashRanges> removedHashRanges;
+    private final Map<Consumer, UpdatedHashRanges> addedHashRanges;
 
-    private ImpactedConsumersResult(Map<Consumer, RemovedHashRanges> 
removedHashRanges) {
+    private ImpactedConsumersResult(Map<Consumer, UpdatedHashRanges> 
removedHashRanges,
+                                    Map<Consumer, UpdatedHashRanges> 
addedHashRanges) {
         this.removedHashRanges = removedHashRanges;
+        this.addedHashRanges = addedHashRanges;
     }
 
-    public static ImpactedConsumersResult of(Map<Consumer, RemovedHashRanges> 
removedHashRanges) {
-        return new ImpactedConsumersResult(removedHashRanges);
+    public static ImpactedConsumersResult of(Map<Consumer, UpdatedHashRanges> 
removedHashRanges,
+                                             Map<Consumer, UpdatedHashRanges> 
addedHashRanges) {
+        return new ImpactedConsumersResult(removedHashRanges, addedHashRanges);
     }
 
-    public void processRemovedHashRanges(RemovedHashRangesProcessor processor) 
{
-        removedHashRanges.forEach((c, r) -> processor.process(c, r));
+    public void processUpdatedHashRanges(UpdatedHashRangesProcessor processor) 
{
+        removedHashRanges.forEach((c, r) -> processor.process(c, r, 
OperationType.REMOVE));
+        addedHashRanges.forEach((c, r) -> processor.process(c, r, 
OperationType.ADD));
     }
 
     public boolean isEmpty() {
@@ -53,7 +58,16 @@ public class ImpactedConsumersResult {
     }
 
     @VisibleForTesting
-    Map<Consumer, RemovedHashRanges> getRemovedHashRanges() {
+    Map<Consumer, UpdatedHashRanges> getRemovedHashRanges() {
         return removedHashRanges;
     }
+
+    @VisibleForTesting
+    Map<Consumer, UpdatedHashRanges> getAddedHashRanges() {
+        return addedHashRanges;
+    }
+
+    public enum OperationType {
+        ADD, REMOVE
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RemovedHashRanges.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/UpdatedHashRanges.java
similarity index 94%
rename from 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RemovedHashRanges.java
rename to 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/UpdatedHashRanges.java
index d9bd502255b..e6f3e6709cf 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RemovedHashRanges.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/UpdatedHashRanges.java
@@ -29,10 +29,10 @@ import org.apache.pulsar.client.api.Range;
  */
 @EqualsAndHashCode
 @ToString
-public class RemovedHashRanges {
+public class UpdatedHashRanges {
     private final Range[] sortedRanges;
 
-    private RemovedHashRanges(List<Range> ranges) {
+    private UpdatedHashRanges(List<Range> ranges) {
         // Converts the set of ranges to an array to avoid iterator allocation
         // when the ranges are iterator multiple times in the pending 
acknowledgments loop.
         this.sortedRanges = ranges.toArray(new Range[0]);
@@ -52,8 +52,8 @@ public class RemovedHashRanges {
         }
     }
 
-    public static RemovedHashRanges of(List<Range> ranges) {
-        return new RemovedHashRanges(ranges);
+    public static UpdatedHashRanges of(List<Range> ranges) {
+        return new UpdatedHashRanges(ranges);
     }
 
     /**
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 9e92a2ab40d..d803bc7d963 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -170,16 +170,25 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
 
     private synchronized void registerDrainingHashes(Consumer skipConsumer,
                                                      ImpactedConsumersResult 
impactedConsumers) {
-        impactedConsumers.processRemovedHashRanges((c, removedHashRanges) -> {
+        impactedConsumers.processUpdatedHashRanges((c, updatedHashRanges, 
opType) -> {
             if (c != skipConsumer) {
                 c.getPendingAcks().forEach((ledgerId, entryId, batchSize, 
stickyKeyHash) -> {
                     if (stickyKeyHash == STICKY_KEY_HASH_NOT_SET) {
                         log.warn("[{}] Sticky key hash was missing for {}:{}", 
getName(), ledgerId, entryId);
                         return;
                     }
-                    if (removedHashRanges.containsStickyKey(stickyKeyHash)) {
-                        // add the pending ack to the draining hashes tracker 
if the hash is in the range
-                        drainingHashesTracker.addEntry(c, stickyKeyHash);
+                    if (updatedHashRanges.containsStickyKey(stickyKeyHash)) {
+                        switch (opType) {
+                            //reduce ref count in case the stickyKeyHash was 
re-assigned to the original consumer
+                            case ADD -> {
+                                var entry = 
drainingHashesTracker.getEntry(stickyKeyHash);
+                                if (entry != null && entry.getConsumer() == c) 
{
+                                    drainingHashesTracker.reduceRefCount(c, 
stickyKeyHash, false);
+                                }
+                            }
+                            // add the pending ack to the draining hashes 
tracker if the hash is in the range
+                            case REMOVE -> drainingHashesTracker.addEntry(c, 
stickyKeyHash);
+                        }
                     }
                 });
             }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
index 82891bfe9c4..d02b9ca27a2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
@@ -36,7 +36,6 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.commons.lang3.mutable.MutableInt;
-import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
 import org.apache.pulsar.client.api.Range;
 import org.assertj.core.data.Offset;
 import org.mockito.Mockito;
@@ -47,7 +46,7 @@ import org.testng.annotations.Test;
 public class ConsistentHashingStickyKeyConsumerSelectorTest {
 
     @Test
-    public void testConsumerSelect() throws ConsumerAssignException {
+    public void testConsumerSelect() {
 
         ConsistentHashingStickyKeyConsumerSelector selector = new 
ConsistentHashingStickyKeyConsumerSelector(200);
         String key1 = "anyKey";
@@ -151,7 +150,7 @@ public class ConsistentHashingStickyKeyConsumerSelectorTest 
{
 
 
     @Test
-    public void testGetConsumerKeyHashRanges() throws 
BrokerServiceException.ConsumerAssignException {
+    public void testGetConsumerKeyHashRanges() {
         ConsistentHashingStickyKeyConsumerSelector selector = new 
ConsistentHashingStickyKeyConsumerSelector(3);
         List<String> consumerName = Arrays.asList("consumer1", "consumer2", 
"consumer3");
         List<Consumer> consumers = new ArrayList<>();
@@ -201,8 +200,7 @@ public class ConsistentHashingStickyKeyConsumerSelectorTest 
{
     }
 
     @Test
-    public void testConsumersGetSufficientlyAccuratelyEvenlyMapped()
-            throws BrokerServiceException.ConsumerAssignException {
+    public void testConsumersGetSufficientlyAccuratelyEvenlyMapped() {
         ConsistentHashingStickyKeyConsumerSelector selector = new 
ConsistentHashingStickyKeyConsumerSelector(200);
         List<Consumer> consumers = new ArrayList<>();
         for (int i = 0; i < 20; i++) {
@@ -530,11 +528,24 @@ public class 
ConsistentHashingStickyKeyConsumerSelectorTest {
             selector.removeConsumer(consumer);
 
             ConsumerHashAssignmentsSnapshot assignmentsAfter = 
selector.getConsumerHashAssignmentsSnapshot();
-            
assertThat(assignmentsBefore.resolveImpactedConsumers(assignmentsAfter).getRemovedHashRanges())
+            ImpactedConsumersResult impactedConsumersAfterRemoval = 
assignmentsBefore
+                    .resolveImpactedConsumers(assignmentsAfter);
+            assertThat(impactedConsumersAfterRemoval.getRemovedHashRanges())
                     .describedAs(
                             "when a consumer is removed, the removed hash 
ranges should only be from "
                                     + "the removed consumer")
                     .containsOnlyKeys(consumer);
+            List<Range> allAddedRangesAfterRemoval = 
ConsumerHashAssignmentsSnapshot.mergeOverlappingRanges(
+                    
impactedConsumersAfterRemoval.getAddedHashRanges().values().stream()
+                            
.map(UpdatedHashRanges::asRanges).flatMap(List::stream)
+                            .collect(Collectors.toCollection(TreeSet::new))
+            );
+            assertThat(allAddedRangesAfterRemoval)
+                    .describedAs(
+                            "when a consumer is removed, all its hash ranges 
should appear "
+                                    + "in added hash ranges"
+                    )
+                    
.containsExactlyElementsOf(assignmentsBefore.getRangesByConsumer().get(consumer));
             assignmentsBefore = assignmentsAfter;
 
             // add consumer back
@@ -543,8 +554,9 @@ public class ConsistentHashingStickyKeyConsumerSelectorTest 
{
             assignmentsAfter = selector.getConsumerHashAssignmentsSnapshot();
             List<Range> addedConsumerRanges = 
assignmentsAfter.getRangesByConsumer().get(consumer);
 
-            Map<Consumer, RemovedHashRanges> removedHashRanges =
-                    
assignmentsBefore.resolveImpactedConsumers(assignmentsAfter).getRemovedHashRanges();
+            ImpactedConsumersResult impactedConsumersAfterAdding = 
assignmentsBefore
+                    .resolveImpactedConsumers(assignmentsAfter);
+            Map<Consumer, UpdatedHashRanges> removedHashRanges = 
impactedConsumersAfterAdding.getRemovedHashRanges();
             ConsumerHashAssignmentsSnapshot finalAssignmentsBefore = 
assignmentsBefore;
             assertThat(removedHashRanges).allSatisfy((c, removedHashRange) -> {
                 assertThat(removedHashRange
@@ -558,12 +570,19 @@ public class 
ConsistentHashingStickyKeyConsumerSelectorTest {
 
             List<Range> allRemovedRanges =
                     ConsumerHashAssignmentsSnapshot.mergeOverlappingRanges(
-                            
removedHashRanges.entrySet().stream().map(Map.Entry::getValue)
-                                    .map(RemovedHashRanges::asRanges)
+                            removedHashRanges.values().stream()
+                                    .map(UpdatedHashRanges::asRanges)
                                     
.flatMap(List::stream).collect(Collectors.toCollection(TreeSet::new)));
             assertThat(allRemovedRanges)
                     .describedAs("all removed ranges should be the same as the 
ranges of the added consumer")
                     .containsExactlyElementsOf(addedConsumerRanges);
+            List<Range> allAddedRangesAfterAdding = 
ConsumerHashAssignmentsSnapshot.mergeOverlappingRanges(
+                    
impactedConsumersAfterAdding.getAddedHashRanges().values().stream()
+                            .map(UpdatedHashRanges::asRanges)
+                            
.flatMap(List::stream).collect(Collectors.toCollection(TreeSet::new)));
+            assertThat(addedConsumerRanges)
+                    .describedAs("all added ranges should be the same as the 
ranges of the added consumer")
+                    .containsExactlyElementsOf(allAddedRangesAfterAdding);
 
             assignmentsBefore = assignmentsAfter;
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshotTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshotTest.java
index 5c886b6eec9..93e53aba7b3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshotTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshotTest.java
@@ -129,14 +129,16 @@ public class ConsumerHashAssignmentsSnapshotTest {
         List<HashRangeAssignment> mappingBefore = new ArrayList<>();
         List<HashRangeAssignment> mappingAfter = new ArrayList<>();
 
+        Range notModifiedRange = Range.of(1, 5);
         Consumer consumer1 = createMockConsumer("consumer1");
-        mappingBefore.add(new HashRangeAssignment(Range.of(1, 5), consumer1));
-        mappingAfter.add(new HashRangeAssignment(Range.of(1, 5), consumer1));
+        mappingBefore.add(new HashRangeAssignment(notModifiedRange, 
consumer1));
+        mappingAfter.add(new HashRangeAssignment(notModifiedRange, consumer1));
 
         ImpactedConsumersResult impactedConsumers =
-                
ConsumerHashAssignmentsSnapshot.resolveConsumerRemovedHashRanges(mappingBefore, 
mappingAfter);
+                
ConsumerHashAssignmentsSnapshot.resolveConsumerUpdatedHashRanges(mappingBefore, 
mappingAfter);
 
         assertThat(impactedConsumers.getRemovedHashRanges()).isEmpty();
+        assertThat(impactedConsumers.getAddedHashRanges()).isEmpty();
     }
 
     @Test
@@ -144,49 +146,59 @@ public class ConsumerHashAssignmentsSnapshotTest {
         List<HashRangeAssignment> mappingBefore = new ArrayList<>();
         List<HashRangeAssignment> mappingAfter = new ArrayList<>();
 
+        Range reAssignedRange = Range.of(1, 5);
         Consumer consumer1 = createMockConsumer("consumer1");
         Consumer consumer2 = createMockConsumer("consumer2");
-        mappingBefore.add(new HashRangeAssignment(Range.of(1, 5), consumer1));
-        mappingAfter.add(new HashRangeAssignment(Range.of(1, 5), consumer2));
+        mappingBefore.add(new HashRangeAssignment(reAssignedRange, consumer1));
+        mappingAfter.add(new HashRangeAssignment(reAssignedRange, consumer2));
 
         ImpactedConsumersResult impactedConsumers =
-                
ConsumerHashAssignmentsSnapshot.resolveConsumerRemovedHashRanges(mappingBefore, 
mappingAfter);
+                
ConsumerHashAssignmentsSnapshot.resolveConsumerUpdatedHashRanges(mappingBefore, 
mappingAfter);
 
         
assertThat(impactedConsumers.getRemovedHashRanges()).containsExactlyInAnyOrderEntriesOf(
-                Map.of(consumer1, RemovedHashRanges.of(List.of(Range.of(1, 
5)))));
+                Map.of(consumer1, 
UpdatedHashRanges.of(List.of(reAssignedRange))));
+        
assertThat(impactedConsumers.getAddedHashRanges()).containsExactlyInAnyOrderEntriesOf(
+                Map.of(consumer2, 
UpdatedHashRanges.of(List.of(reAssignedRange)))
+        );
     }
 
     @Test
-    public void testResolveConsumerRemovedHashRanges_RangeAdded() {
+    public void testResolveConsumerUpdatedHashRanges_RangeAdded() {
         List<HashRangeAssignment> mappingBefore = new ArrayList<>();
         List<HashRangeAssignment> mappingAfter = new ArrayList<>();
 
+        Range addedRange = Range.of(1, 5);
         Consumer consumer1 = createMockConsumer("consumer1");
-        mappingAfter.add(new HashRangeAssignment(Range.of(1, 5), consumer1));
+        mappingAfter.add(new HashRangeAssignment(addedRange, consumer1));
 
         ImpactedConsumersResult impactedConsumers =
-                
ConsumerHashAssignmentsSnapshot.resolveConsumerRemovedHashRanges(mappingBefore, 
mappingAfter);
+                
ConsumerHashAssignmentsSnapshot.resolveConsumerUpdatedHashRanges(mappingBefore, 
mappingAfter);
 
         assertThat(impactedConsumers.getRemovedHashRanges()).isEmpty();
+        
assertThat(impactedConsumers.getAddedHashRanges()).containsExactlyInAnyOrderEntriesOf(
+                Map.of(consumer1, UpdatedHashRanges.of(List.of(addedRange)))
+        );
     }
 
     @Test
-    public void testResolveConsumerRemovedHashRanges_RangeRemoved() {
+    public void testResolveConsumerRemovedHashRanges_RangeUpdated() {
         List<HashRangeAssignment> mappingBefore = new ArrayList<>();
         List<HashRangeAssignment> mappingAfter = new ArrayList<>();
 
+        Range removedRange = Range.of(1, 5);
         Consumer consumer1 = createMockConsumer("consumer1");
-        mappingBefore.add(new HashRangeAssignment(Range.of(1, 5), consumer1));
+        mappingBefore.add(new HashRangeAssignment(removedRange, consumer1));
 
         ImpactedConsumersResult impactedConsumers =
-                
ConsumerHashAssignmentsSnapshot.resolveConsumerRemovedHashRanges(mappingBefore, 
mappingAfter);
+                
ConsumerHashAssignmentsSnapshot.resolveConsumerUpdatedHashRanges(mappingBefore, 
mappingAfter);
 
         
assertThat(impactedConsumers.getRemovedHashRanges()).containsExactlyInAnyOrderEntriesOf(
-                Map.of(consumer1, RemovedHashRanges.of(List.of(Range.of(1, 
5)))));
+                Map.of(consumer1, 
UpdatedHashRanges.of(List.of(removedRange))));
+        assertThat(impactedConsumers.getAddedHashRanges()).isEmpty();
     }
 
     @Test
-    public void testResolveConsumerRemovedHashRanges_OverlappingRanges() {
+    public void testResolveConsumerUpdatedHashRanges_OverlappingRanges() {
         List<HashRangeAssignment> mappingBefore = new ArrayList<>();
         List<HashRangeAssignment> mappingAfter = new ArrayList<>();
 
@@ -196,9 +208,12 @@ public class ConsumerHashAssignmentsSnapshotTest {
         mappingAfter.add(new HashRangeAssignment(Range.of(3, 7), consumer2));
 
         ImpactedConsumersResult impactedConsumers =
-                
ConsumerHashAssignmentsSnapshot.resolveConsumerRemovedHashRanges(mappingBefore, 
mappingAfter);
+                
ConsumerHashAssignmentsSnapshot.resolveConsumerUpdatedHashRanges(mappingBefore, 
mappingAfter);
 
         
assertThat(impactedConsumers.getRemovedHashRanges()).containsExactlyInAnyOrderEntriesOf(
-                Map.of(consumer1, RemovedHashRanges.of(List.of(Range.of(3, 
5)))));
+                Map.of(consumer1, UpdatedHashRanges.of(List.of(Range.of(3, 
5)))));
+        
assertThat(impactedConsumers.getAddedHashRanges()).containsExactlyInAnyOrderEntriesOf(
+                Map.of(consumer2, UpdatedHashRanges.of(List.of(Range.of(3, 
7))))
+        );
     }
 }
\ No newline at end of file
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index af7c32192c7..80b5070077d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -30,6 +30,7 @@ import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.common.collect.Lists;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.time.Duration;
@@ -919,6 +920,136 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
         consumer.close();
     }
 
+    /*
+        Verifies https://github.com/apache/pulsar/issues/23845 issue fixed
+
+        The scenario of verification
+
+        Preconditions:
+
+        1. Consumer "consumer" up and running
+        2. Producer "producer" connected to broker
+
+        Steps to reproduce:
+        1. "producer" send message with key "testMessageKey"
+        Example: "testMessage" hash is always 124
+
+        2. "consumer" receives BUT DO NOT acknowledge message with key 
"testMessageKey"
+        Example: "consumer" hash range: [0-256]. "testMessage" hash range: 
[0-256]
+
+        3. Add more consumers until hash range of message with key 
"testMessageKey" removed from "consumer" hash ranges
+        Example: "consumer" hash range: [0-123]. "consumer N" hash range: 
[124-136]. "testMessage" hash range: [124-136]
+
+        4. Stop the added consumers until message key "testMessageKey" hash 
range moved to "consumer" hash ranges
+        Example: "consumer" hash range: [0-128]. "testMessage" hash range: 
[0-128]
+
+        5. Add more consumers until hash range of message with key 
"testMessageKey" removed from "consumer" hash ranges
+        Example: "consumer" hash range: [0-96]. "consumer N" hash range: 
[121-154]. "testMessage" hash range: [121-154]
+
+        6. Stop "consumer"
+
+        Actual result for bug:
+        No message with key "testMessageKey" received by consumer who owns 
message's hash range
+
+        Expected result:
+        The message with key "testMessageKey" received by consumer who owns 
message's hash range
+        Example: "consumer N" with hash range [121-154] received the message
+
+     */
+    @Test
+    void testMessageDeliveredFromDrainingHashes() throws PulsarClientException 
{
+        String messageKey = "testMessageKey";
+        String topic = "testMessageDeliveredFromDrainingHashes" + 
UUID.randomUUID();
+
+        List<Consumer<Integer>> consumers = new ArrayList<>();
+
+        @Cleanup
+        Consumer<Integer> consumer = createConsumer(topic);
+        String initialOwnerName = consumer.getConsumerName();
+
+        @Cleanup
+        Producer<Integer> producer = createProducer(topic, false);
+        producer.newMessage().key(messageKey).value(1).send();
+
+        // receive but not acknowledge the message to leave it in pending acks
+        Message<Integer> received = consumer.receive();
+        assertThat(received.getKey()).isEqualTo(messageKey);
+
+        StickyKeyDispatcher dispatcher = getDispatcher(topic, 
SUBSCRIPTION_NAME);
+        StickyKeyConsumerSelector selector = dispatcher.getSelector();
+        int messageKeyHash = 
selector.makeStickyKeyHash(messageKey.getBytes(StandardCharsets.UTF_8));
+
+        try {
+            // add new consumers until hash range of the initial message owner 
moved
+            Pair<String, List<Consumer<Integer>>> addedConsumers = 
addConsumersUntilOwnerChanged(
+                    topic, initialOwnerName, messageKeyHash, selector
+            );
+            String currentOwnerName = addedConsumers.getKey();
+            consumers.addAll(addedConsumers.getValue());
+
+            // returning hash range to the initial owner
+            for (int i = consumers.size() - 1; i > -1 && 
!initialOwnerName.equals(currentOwnerName); i--) {
+                consumers.remove(i).close();
+                currentOwnerName = findOwnerName(selector, messageKeyHash);
+            }
+
+            // add new consumers until hash range of the initial message owner 
moved
+            Pair<String, List<Consumer<Integer>>> addedConsumersAfter = 
addConsumersUntilOwnerChanged(
+                    topic, initialOwnerName, messageKeyHash, selector
+            );
+
+            String currentOwnerNameAfter = addedConsumersAfter.getKey();
+            consumers.addAll(addedConsumersAfter.getValue());
+
+            // remove the initial owner
+            consumer.close();
+
+            // verify the message sent to new consumer which owns hash range
+            Optional<Consumer<Integer>> theNewOwnerMaybe = consumers
+                    .stream()
+                    .filter(c -> 
c.getConsumerName().equals(currentOwnerNameAfter))
+                    .findAny();
+
+            assertThat(theNewOwnerMaybe).isPresent();
+
+            Consumer<Integer> theNewOwner = theNewOwnerMaybe.get();
+
+            Message<Integer> message = theNewOwner.receive(5, 
TimeUnit.SECONDS);
+            assertThat(message).describedAs("Message with key " + messageKey
+                    + " expected to be delivered to consumer " + 
theNewOwner.getConsumerName()).isNotNull();
+            assertThat(message.getKey()).isEqualTo(messageKey);
+        } finally {
+            for (Consumer<Integer> c : consumers) {
+                c.close();
+            }
+        }
+    }
+
+    private Pair<String, List<Consumer<Integer>>> 
addConsumersUntilOwnerChanged(
+            String topic,
+            String initialOwnerName,
+            int messageKeyHash,
+            StickyKeyConsumerSelector selector
+    ) throws PulsarClientException {
+        String currentOwnerName;
+        List<Consumer<Integer>> consumers = new ArrayList<>();
+        do {
+            Consumer<Integer> addedC = createConsumer(topic);
+            consumers.add(addedC);
+            currentOwnerName = findOwnerName(selector, messageKeyHash);
+        } while (initialOwnerName.equals(currentOwnerName));
+        return Pair.of(currentOwnerName, consumers);
+    }
+
+    private String findOwnerName(StickyKeyConsumerSelector selector, int hash) 
{
+        return selector.getConsumerKeyHashRanges().entrySet().stream()
+                .filter(entry ->
+                        entry.getValue().stream().anyMatch(range -> 
range.contains(hash))
+                )
+                .findAny().orElseThrow(() -> new IllegalArgumentException("No 
owner for the hash " + hash))
+                .getKey().consumerName();
+    }
+
     @Test(dataProvider = "currentImplementationType")
     public void testAttachKeyToMessageMetadata(KeySharedImplementationType 
impl) throws PulsarClientException {
         String topic = "persistent://public/default/key_shared-" + 
UUID.randomUUID();

Reply via email to