This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 3465d2a1064 Revert "[fix][broker] Key_Shared subscription doesn't 
always deliver messages from the replay queue after a consumer disconnects and 
leaves a backlog" (#24735)
3465d2a1064 is described below

commit 3465d2a1064b3c9a02c8475d368f33c3f9d4b847
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Sep 12 12:34:23 2025 +0300

    Revert "[fix][broker] Key_Shared subscription doesn't always deliver 
messages from the replay queue after a consumer disconnects and leaves a 
backlog" (#24735)
    
    the PR should target master branch.
---
 .../service/ConsumerHashAssignmentsSnapshot.java   |  44 +++----
 .../broker/service/DrainingHashesTracker.java      |   8 --
 .../broker/service/ImpactedConsumersResult.java    |  32 ++---
 ...datedHashRanges.java => RemovedHashRanges.java} |   8 +-
 ...istentStickyKeyDispatcherMultipleConsumers.java |  17 +--
 ...istentHashingStickyKeyConsumerSelectorTest.java |  39 ++----
 .../ConsumerHashAssignmentsSnapshotTest.java       |  49 +++-----
 .../client/api/KeySharedSubscriptionTest.java      | 131 ---------------------
 8 files changed, 62 insertions(+), 266 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshot.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshot.java
index 0607711826b..18ea927c642 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshot.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshot.java
@@ -72,7 +72,7 @@ public class ConsumerHashAssignmentsSnapshot {
     }
 
     public ImpactedConsumersResult 
resolveImpactedConsumers(ConsumerHashAssignmentsSnapshot assignmentsAfter) {
-        return resolveConsumerUpdatedHashRanges(this.hashRangeAssignments, 
assignmentsAfter.hashRangeAssignments);
+        return resolveConsumerRemovedHashRanges(this.hashRangeAssignments, 
assignmentsAfter.hashRangeAssignments);
     }
 
     /**
@@ -111,36 +111,28 @@ public class ConsumerHashAssignmentsSnapshot {
      * @param mappingAfter the range mapping after the change
      * @return consumers and ranges where the existing range changed
      */
-    static ImpactedConsumersResult 
resolveConsumerUpdatedHashRanges(List<HashRangeAssignment> mappingBefore,
+    static ImpactedConsumersResult 
resolveConsumerRemovedHashRanges(List<HashRangeAssignment> mappingBefore,
                                                                     
List<HashRangeAssignment> mappingAfter) {
         Map<Range, Pair<Consumer, Consumer>> impactedRanges = 
diffRanges(mappingBefore, mappingAfter);
-        Map<Consumer, SortedSet<Range>> addedRangesByConsumer = new 
IdentityHashMap<>();
-        Map<Consumer, SortedSet<Range>> removedRangesByConsumer = new 
IdentityHashMap<>();
-        impactedRanges.forEach((range, value) -> {
-            Consumer consumerAfter = value.getRight();
-
-            // last consumer was removed
-            if (consumerAfter != null) {
-                addedRangesByConsumer.computeIfAbsent(consumerAfter, k -> new 
TreeSet<>()).add(range);
-            }
-            // filter out only where the range was removed
-            Consumer consumerBefore = value.getLeft();
-            if (consumerBefore != null) {
-                removedRangesByConsumer.computeIfAbsent(consumerBefore, k -> 
new TreeSet<>()).add(range);
-            }
-        });
-        var removedMerged = 
mergedOverlappingRangesAndConvertToImpactedConsumersResult(removedRangesByConsumer);
-        var addedMerged = 
mergedOverlappingRangesAndConvertToImpactedConsumersResult(addedRangesByConsumer);
-        return ImpactedConsumersResult.of(removedMerged, addedMerged);
+        Map<Consumer, SortedSet<Range>> removedRangesByConsumer = 
impactedRanges.entrySet().stream()
+                .collect(IdentityHashMap::new, (resultMap, entry) -> {
+                    Range range = entry.getKey();
+                    // filter out only where the range was removed
+                    Consumer consumerBefore = entry.getValue().getLeft();
+                    if (consumerBefore != null) {
+                        resultMap.computeIfAbsent(consumerBefore, k -> new 
TreeSet<>()).add(range);
+                    }
+                }, IdentityHashMap::putAll);
+        return 
mergedOverlappingRangesAndConvertToImpactedConsumersResult(removedRangesByConsumer);
     }
 
-    static Map<Consumer, UpdatedHashRanges> 
mergedOverlappingRangesAndConvertToImpactedConsumersResult(
-            Map<Consumer, SortedSet<Range>> updatedRangesByConsumer) {
-        Map<Consumer, UpdatedHashRanges> mergedRangesByConsumer = new 
IdentityHashMap<>();
-        updatedRangesByConsumer.forEach((consumer, ranges) -> {
-            mergedRangesByConsumer.put(consumer, 
UpdatedHashRanges.of(mergeOverlappingRanges(ranges)));
+    static ImpactedConsumersResult 
mergedOverlappingRangesAndConvertToImpactedConsumersResult(
+            Map<Consumer, SortedSet<Range>> removedRangesByConsumer) {
+        Map<Consumer, RemovedHashRanges> mergedRangesByConsumer = new 
IdentityHashMap<>();
+        removedRangesByConsumer.forEach((consumer, ranges) -> {
+            mergedRangesByConsumer.put(consumer, 
RemovedHashRanges.of(mergeOverlappingRanges(ranges)));
         });
-        return mergedRangesByConsumer;
+        return ImpactedConsumersResult.of(mergedRangesByConsumer);
     }
 
     /**
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java
index 51c45817368..9bc5c5f1e44 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java
@@ -400,14 +400,6 @@ public class DrainingHashesTracker {
             } finally {
                 lock.writeLock().unlock();
             }
-
-            // update the consumer specific stats
-            ConsumerDrainingHashesStats drainingHashesStats =
-                    consumerDrainingHashesStatsMap.get(new 
ConsumerIdentityWrapper(consumer));
-            if (drainingHashesStats != null) {
-                drainingHashesStats.clearHash(stickyKeyHash);
-            }
-
             return false;
         }
         // increment the blocked count which is used to determine if the hash 
is blocking
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ImpactedConsumersResult.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ImpactedConsumersResult.java
index bb888db1ca3..a525b0501d7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ImpactedConsumersResult.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ImpactedConsumersResult.java
@@ -30,27 +30,22 @@ import lombok.ToString;
 @EqualsAndHashCode
 @ToString
 public class ImpactedConsumersResult {
-    public interface UpdatedHashRangesProcessor {
-        void process(Consumer consumer, UpdatedHashRanges updatedHashRanges, 
OperationType operationType);
+    public interface RemovedHashRangesProcessor {
+        void process(Consumer consumer, RemovedHashRanges removedHashRanges);
     }
 
-    private final Map<Consumer, UpdatedHashRanges> removedHashRanges;
-    private final Map<Consumer, UpdatedHashRanges> addedHashRanges;
+    private final Map<Consumer, RemovedHashRanges> removedHashRanges;
 
-    private ImpactedConsumersResult(Map<Consumer, UpdatedHashRanges> 
removedHashRanges,
-                                    Map<Consumer, UpdatedHashRanges> 
addedHashRanges) {
+    private ImpactedConsumersResult(Map<Consumer, RemovedHashRanges> 
removedHashRanges) {
         this.removedHashRanges = removedHashRanges;
-        this.addedHashRanges = addedHashRanges;
     }
 
-    public static ImpactedConsumersResult of(Map<Consumer, UpdatedHashRanges> 
removedHashRanges,
-                                             Map<Consumer, UpdatedHashRanges> 
addedHashRanges) {
-        return new ImpactedConsumersResult(removedHashRanges, addedHashRanges);
+    public static ImpactedConsumersResult of(Map<Consumer, RemovedHashRanges> 
removedHashRanges) {
+        return new ImpactedConsumersResult(removedHashRanges);
     }
 
-    public void processUpdatedHashRanges(UpdatedHashRangesProcessor processor) 
{
-        removedHashRanges.forEach((c, r) -> processor.process(c, r, 
OperationType.REMOVE));
-        addedHashRanges.forEach((c, r) -> processor.process(c, r, 
OperationType.ADD));
+    public void processRemovedHashRanges(RemovedHashRangesProcessor processor) 
{
+        removedHashRanges.forEach((c, r) -> processor.process(c, r));
     }
 
     public boolean isEmpty() {
@@ -58,16 +53,7 @@ public class ImpactedConsumersResult {
     }
 
     @VisibleForTesting
-    Map<Consumer, UpdatedHashRanges> getRemovedHashRanges() {
+    Map<Consumer, RemovedHashRanges> getRemovedHashRanges() {
         return removedHashRanges;
     }
-
-    @VisibleForTesting
-    Map<Consumer, UpdatedHashRanges> getAddedHashRanges() {
-        return addedHashRanges;
-    }
-
-    public enum OperationType {
-        ADD, REMOVE
-    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/UpdatedHashRanges.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RemovedHashRanges.java
similarity index 94%
rename from 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/UpdatedHashRanges.java
rename to 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RemovedHashRanges.java
index e6f3e6709cf..d9bd502255b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/UpdatedHashRanges.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RemovedHashRanges.java
@@ -29,10 +29,10 @@ import org.apache.pulsar.client.api.Range;
  */
 @EqualsAndHashCode
 @ToString
-public class UpdatedHashRanges {
+public class RemovedHashRanges {
     private final Range[] sortedRanges;
 
-    private UpdatedHashRanges(List<Range> ranges) {
+    private RemovedHashRanges(List<Range> ranges) {
         // Converts the set of ranges to an array to avoid iterator allocation
         // when the ranges are iterator multiple times in the pending 
acknowledgments loop.
         this.sortedRanges = ranges.toArray(new Range[0]);
@@ -52,8 +52,8 @@ public class UpdatedHashRanges {
         }
     }
 
-    public static UpdatedHashRanges of(List<Range> ranges) {
-        return new UpdatedHashRanges(ranges);
+    public static RemovedHashRanges of(List<Range> ranges) {
+        return new RemovedHashRanges(ranges);
     }
 
     /**
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index d803bc7d963..9e92a2ab40d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -170,25 +170,16 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
 
     private synchronized void registerDrainingHashes(Consumer skipConsumer,
                                                      ImpactedConsumersResult 
impactedConsumers) {
-        impactedConsumers.processUpdatedHashRanges((c, updatedHashRanges, 
opType) -> {
+        impactedConsumers.processRemovedHashRanges((c, removedHashRanges) -> {
             if (c != skipConsumer) {
                 c.getPendingAcks().forEach((ledgerId, entryId, batchSize, 
stickyKeyHash) -> {
                     if (stickyKeyHash == STICKY_KEY_HASH_NOT_SET) {
                         log.warn("[{}] Sticky key hash was missing for {}:{}", 
getName(), ledgerId, entryId);
                         return;
                     }
-                    if (updatedHashRanges.containsStickyKey(stickyKeyHash)) {
-                        switch (opType) {
-                            //reduce ref count in case the stickyKeyHash was 
re-assigned to the original consumer
-                            case ADD -> {
-                                var entry = 
drainingHashesTracker.getEntry(stickyKeyHash);
-                                if (entry != null && entry.getConsumer() == c) 
{
-                                    drainingHashesTracker.reduceRefCount(c, 
stickyKeyHash, false);
-                                }
-                            }
-                            // add the pending ack to the draining hashes 
tracker if the hash is in the range
-                            case REMOVE -> drainingHashesTracker.addEntry(c, 
stickyKeyHash);
-                        }
+                    if (removedHashRanges.containsStickyKey(stickyKeyHash)) {
+                        // add the pending ack to the draining hashes tracker 
if the hash is in the range
+                        drainingHashesTracker.addEntry(c, stickyKeyHash);
                     }
                 });
             }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
index d02b9ca27a2..82891bfe9c4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
@@ -36,6 +36,7 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.commons.lang3.mutable.MutableInt;
+import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
 import org.apache.pulsar.client.api.Range;
 import org.assertj.core.data.Offset;
 import org.mockito.Mockito;
@@ -46,7 +47,7 @@ import org.testng.annotations.Test;
 public class ConsistentHashingStickyKeyConsumerSelectorTest {
 
     @Test
-    public void testConsumerSelect() {
+    public void testConsumerSelect() throws ConsumerAssignException {
 
         ConsistentHashingStickyKeyConsumerSelector selector = new 
ConsistentHashingStickyKeyConsumerSelector(200);
         String key1 = "anyKey";
@@ -150,7 +151,7 @@ public class ConsistentHashingStickyKeyConsumerSelectorTest 
{
 
 
     @Test
-    public void testGetConsumerKeyHashRanges() {
+    public void testGetConsumerKeyHashRanges() throws 
BrokerServiceException.ConsumerAssignException {
         ConsistentHashingStickyKeyConsumerSelector selector = new 
ConsistentHashingStickyKeyConsumerSelector(3);
         List<String> consumerName = Arrays.asList("consumer1", "consumer2", 
"consumer3");
         List<Consumer> consumers = new ArrayList<>();
@@ -200,7 +201,8 @@ public class ConsistentHashingStickyKeyConsumerSelectorTest 
{
     }
 
     @Test
-    public void testConsumersGetSufficientlyAccuratelyEvenlyMapped() {
+    public void testConsumersGetSufficientlyAccuratelyEvenlyMapped()
+            throws BrokerServiceException.ConsumerAssignException {
         ConsistentHashingStickyKeyConsumerSelector selector = new 
ConsistentHashingStickyKeyConsumerSelector(200);
         List<Consumer> consumers = new ArrayList<>();
         for (int i = 0; i < 20; i++) {
@@ -528,24 +530,11 @@ public class 
ConsistentHashingStickyKeyConsumerSelectorTest {
             selector.removeConsumer(consumer);
 
             ConsumerHashAssignmentsSnapshot assignmentsAfter = 
selector.getConsumerHashAssignmentsSnapshot();
-            ImpactedConsumersResult impactedConsumersAfterRemoval = 
assignmentsBefore
-                    .resolveImpactedConsumers(assignmentsAfter);
-            assertThat(impactedConsumersAfterRemoval.getRemovedHashRanges())
+            
assertThat(assignmentsBefore.resolveImpactedConsumers(assignmentsAfter).getRemovedHashRanges())
                     .describedAs(
                             "when a consumer is removed, the removed hash 
ranges should only be from "
                                     + "the removed consumer")
                     .containsOnlyKeys(consumer);
-            List<Range> allAddedRangesAfterRemoval = 
ConsumerHashAssignmentsSnapshot.mergeOverlappingRanges(
-                    
impactedConsumersAfterRemoval.getAddedHashRanges().values().stream()
-                            
.map(UpdatedHashRanges::asRanges).flatMap(List::stream)
-                            .collect(Collectors.toCollection(TreeSet::new))
-            );
-            assertThat(allAddedRangesAfterRemoval)
-                    .describedAs(
-                            "when a consumer is removed, all its hash ranges 
should appear "
-                                    + "in added hash ranges"
-                    )
-                    
.containsExactlyElementsOf(assignmentsBefore.getRangesByConsumer().get(consumer));
             assignmentsBefore = assignmentsAfter;
 
             // add consumer back
@@ -554,9 +543,8 @@ public class ConsistentHashingStickyKeyConsumerSelectorTest 
{
             assignmentsAfter = selector.getConsumerHashAssignmentsSnapshot();
             List<Range> addedConsumerRanges = 
assignmentsAfter.getRangesByConsumer().get(consumer);
 
-            ImpactedConsumersResult impactedConsumersAfterAdding = 
assignmentsBefore
-                    .resolveImpactedConsumers(assignmentsAfter);
-            Map<Consumer, UpdatedHashRanges> removedHashRanges = 
impactedConsumersAfterAdding.getRemovedHashRanges();
+            Map<Consumer, RemovedHashRanges> removedHashRanges =
+                    
assignmentsBefore.resolveImpactedConsumers(assignmentsAfter).getRemovedHashRanges();
             ConsumerHashAssignmentsSnapshot finalAssignmentsBefore = 
assignmentsBefore;
             assertThat(removedHashRanges).allSatisfy((c, removedHashRange) -> {
                 assertThat(removedHashRange
@@ -570,19 +558,12 @@ public class 
ConsistentHashingStickyKeyConsumerSelectorTest {
 
             List<Range> allRemovedRanges =
                     ConsumerHashAssignmentsSnapshot.mergeOverlappingRanges(
-                            removedHashRanges.values().stream()
-                                    .map(UpdatedHashRanges::asRanges)
+                            
removedHashRanges.entrySet().stream().map(Map.Entry::getValue)
+                                    .map(RemovedHashRanges::asRanges)
                                     
.flatMap(List::stream).collect(Collectors.toCollection(TreeSet::new)));
             assertThat(allRemovedRanges)
                     .describedAs("all removed ranges should be the same as the 
ranges of the added consumer")
                     .containsExactlyElementsOf(addedConsumerRanges);
-            List<Range> allAddedRangesAfterAdding = 
ConsumerHashAssignmentsSnapshot.mergeOverlappingRanges(
-                    
impactedConsumersAfterAdding.getAddedHashRanges().values().stream()
-                            .map(UpdatedHashRanges::asRanges)
-                            
.flatMap(List::stream).collect(Collectors.toCollection(TreeSet::new)));
-            assertThat(addedConsumerRanges)
-                    .describedAs("all added ranges should be the same as the 
ranges of the added consumer")
-                    .containsExactlyElementsOf(allAddedRangesAfterAdding);
 
             assignmentsBefore = assignmentsAfter;
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshotTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshotTest.java
index 93e53aba7b3..5c886b6eec9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshotTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshotTest.java
@@ -129,16 +129,14 @@ public class ConsumerHashAssignmentsSnapshotTest {
         List<HashRangeAssignment> mappingBefore = new ArrayList<>();
         List<HashRangeAssignment> mappingAfter = new ArrayList<>();
 
-        Range notModifiedRange = Range.of(1, 5);
         Consumer consumer1 = createMockConsumer("consumer1");
-        mappingBefore.add(new HashRangeAssignment(notModifiedRange, 
consumer1));
-        mappingAfter.add(new HashRangeAssignment(notModifiedRange, consumer1));
+        mappingBefore.add(new HashRangeAssignment(Range.of(1, 5), consumer1));
+        mappingAfter.add(new HashRangeAssignment(Range.of(1, 5), consumer1));
 
         ImpactedConsumersResult impactedConsumers =
-                
ConsumerHashAssignmentsSnapshot.resolveConsumerUpdatedHashRanges(mappingBefore, 
mappingAfter);
+                
ConsumerHashAssignmentsSnapshot.resolveConsumerRemovedHashRanges(mappingBefore, 
mappingAfter);
 
         assertThat(impactedConsumers.getRemovedHashRanges()).isEmpty();
-        assertThat(impactedConsumers.getAddedHashRanges()).isEmpty();
     }
 
     @Test
@@ -146,59 +144,49 @@ public class ConsumerHashAssignmentsSnapshotTest {
         List<HashRangeAssignment> mappingBefore = new ArrayList<>();
         List<HashRangeAssignment> mappingAfter = new ArrayList<>();
 
-        Range reAssignedRange = Range.of(1, 5);
         Consumer consumer1 = createMockConsumer("consumer1");
         Consumer consumer2 = createMockConsumer("consumer2");
-        mappingBefore.add(new HashRangeAssignment(reAssignedRange, consumer1));
-        mappingAfter.add(new HashRangeAssignment(reAssignedRange, consumer2));
+        mappingBefore.add(new HashRangeAssignment(Range.of(1, 5), consumer1));
+        mappingAfter.add(new HashRangeAssignment(Range.of(1, 5), consumer2));
 
         ImpactedConsumersResult impactedConsumers =
-                
ConsumerHashAssignmentsSnapshot.resolveConsumerUpdatedHashRanges(mappingBefore, 
mappingAfter);
+                
ConsumerHashAssignmentsSnapshot.resolveConsumerRemovedHashRanges(mappingBefore, 
mappingAfter);
 
         
assertThat(impactedConsumers.getRemovedHashRanges()).containsExactlyInAnyOrderEntriesOf(
-                Map.of(consumer1, 
UpdatedHashRanges.of(List.of(reAssignedRange))));
-        
assertThat(impactedConsumers.getAddedHashRanges()).containsExactlyInAnyOrderEntriesOf(
-                Map.of(consumer2, 
UpdatedHashRanges.of(List.of(reAssignedRange)))
-        );
+                Map.of(consumer1, RemovedHashRanges.of(List.of(Range.of(1, 
5)))));
     }
 
     @Test
-    public void testResolveConsumerUpdatedHashRanges_RangeAdded() {
+    public void testResolveConsumerRemovedHashRanges_RangeAdded() {
         List<HashRangeAssignment> mappingBefore = new ArrayList<>();
         List<HashRangeAssignment> mappingAfter = new ArrayList<>();
 
-        Range addedRange = Range.of(1, 5);
         Consumer consumer1 = createMockConsumer("consumer1");
-        mappingAfter.add(new HashRangeAssignment(addedRange, consumer1));
+        mappingAfter.add(new HashRangeAssignment(Range.of(1, 5), consumer1));
 
         ImpactedConsumersResult impactedConsumers =
-                
ConsumerHashAssignmentsSnapshot.resolveConsumerUpdatedHashRanges(mappingBefore, 
mappingAfter);
+                
ConsumerHashAssignmentsSnapshot.resolveConsumerRemovedHashRanges(mappingBefore, 
mappingAfter);
 
         assertThat(impactedConsumers.getRemovedHashRanges()).isEmpty();
-        
assertThat(impactedConsumers.getAddedHashRanges()).containsExactlyInAnyOrderEntriesOf(
-                Map.of(consumer1, UpdatedHashRanges.of(List.of(addedRange)))
-        );
     }
 
     @Test
-    public void testResolveConsumerRemovedHashRanges_RangeUpdated() {
+    public void testResolveConsumerRemovedHashRanges_RangeRemoved() {
         List<HashRangeAssignment> mappingBefore = new ArrayList<>();
         List<HashRangeAssignment> mappingAfter = new ArrayList<>();
 
-        Range removedRange = Range.of(1, 5);
         Consumer consumer1 = createMockConsumer("consumer1");
-        mappingBefore.add(new HashRangeAssignment(removedRange, consumer1));
+        mappingBefore.add(new HashRangeAssignment(Range.of(1, 5), consumer1));
 
         ImpactedConsumersResult impactedConsumers =
-                
ConsumerHashAssignmentsSnapshot.resolveConsumerUpdatedHashRanges(mappingBefore, 
mappingAfter);
+                
ConsumerHashAssignmentsSnapshot.resolveConsumerRemovedHashRanges(mappingBefore, 
mappingAfter);
 
         
assertThat(impactedConsumers.getRemovedHashRanges()).containsExactlyInAnyOrderEntriesOf(
-                Map.of(consumer1, 
UpdatedHashRanges.of(List.of(removedRange))));
-        assertThat(impactedConsumers.getAddedHashRanges()).isEmpty();
+                Map.of(consumer1, RemovedHashRanges.of(List.of(Range.of(1, 
5)))));
     }
 
     @Test
-    public void testResolveConsumerUpdatedHashRanges_OverlappingRanges() {
+    public void testResolveConsumerRemovedHashRanges_OverlappingRanges() {
         List<HashRangeAssignment> mappingBefore = new ArrayList<>();
         List<HashRangeAssignment> mappingAfter = new ArrayList<>();
 
@@ -208,12 +196,9 @@ public class ConsumerHashAssignmentsSnapshotTest {
         mappingAfter.add(new HashRangeAssignment(Range.of(3, 7), consumer2));
 
         ImpactedConsumersResult impactedConsumers =
-                
ConsumerHashAssignmentsSnapshot.resolveConsumerUpdatedHashRanges(mappingBefore, 
mappingAfter);
+                
ConsumerHashAssignmentsSnapshot.resolveConsumerRemovedHashRanges(mappingBefore, 
mappingAfter);
 
         
assertThat(impactedConsumers.getRemovedHashRanges()).containsExactlyInAnyOrderEntriesOf(
-                Map.of(consumer1, UpdatedHashRanges.of(List.of(Range.of(3, 
5)))));
-        
assertThat(impactedConsumers.getAddedHashRanges()).containsExactlyInAnyOrderEntriesOf(
-                Map.of(consumer2, UpdatedHashRanges.of(List.of(Range.of(3, 
7))))
-        );
+                Map.of(consumer1, RemovedHashRanges.of(List.of(Range.of(3, 
5)))));
     }
 }
\ No newline at end of file
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 80b5070077d..af7c32192c7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -30,7 +30,6 @@ import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.common.collect.Lists;
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.time.Duration;
@@ -920,136 +919,6 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
         consumer.close();
     }
 
-    /*
-        Verifies https://github.com/apache/pulsar/issues/23845 issue fixed
-
-        The scenario of verification
-
-        Preconditions:
-
-        1. Consumer "consumer" up and running
-        2. Producer "producer" connected to broker
-
-        Steps to reproduce:
-        1. "producer" send message with key "testMessageKey"
-        Example: "testMessage" hash is always 124
-
-        2. "consumer" receives BUT DO NOT acknowledge message with key 
"testMessageKey"
-        Example: "consumer" hash range: [0-256]. "testMessage" hash range: 
[0-256]
-
-        3. Add more consumers until hash range of message with key 
"testMessageKey" removed from "consumer" hash ranges
-        Example: "consumer" hash range: [0-123]. "consumer N" hash range: 
[124-136]. "testMessage" hash range: [124-136]
-
-        4. Stop the added consumers until message key "testMessageKey" hash 
range moved to "consumer" hash ranges
-        Example: "consumer" hash range: [0-128]. "testMessage" hash range: 
[0-128]
-
-        5. Add more consumers until hash range of message with key 
"testMessageKey" removed from "consumer" hash ranges
-        Example: "consumer" hash range: [0-96]. "consumer N" hash range: 
[121-154]. "testMessage" hash range: [121-154]
-
-        6. Stop "consumer"
-
-        Actual result for bug:
-        No message with key "testMessageKey" received by consumer who owns 
message's hash range
-
-        Expected result:
-        The message with key "testMessageKey" received by consumer who owns 
message's hash range
-        Example: "consumer N" with hash range [121-154] received the message
-
-     */
-    @Test
-    void testMessageDeliveredFromDrainingHashes() throws PulsarClientException 
{
-        String messageKey = "testMessageKey";
-        String topic = "testMessageDeliveredFromDrainingHashes" + 
UUID.randomUUID();
-
-        List<Consumer<Integer>> consumers = new ArrayList<>();
-
-        @Cleanup
-        Consumer<Integer> consumer = createConsumer(topic);
-        String initialOwnerName = consumer.getConsumerName();
-
-        @Cleanup
-        Producer<Integer> producer = createProducer(topic, false);
-        producer.newMessage().key(messageKey).value(1).send();
-
-        // receive but not acknowledge the message to leave it in pending acks
-        Message<Integer> received = consumer.receive();
-        assertThat(received.getKey()).isEqualTo(messageKey);
-
-        StickyKeyDispatcher dispatcher = getDispatcher(topic, 
SUBSCRIPTION_NAME);
-        StickyKeyConsumerSelector selector = dispatcher.getSelector();
-        int messageKeyHash = 
selector.makeStickyKeyHash(messageKey.getBytes(StandardCharsets.UTF_8));
-
-        try {
-            // add new consumers until hash range of the initial message owner 
moved
-            Pair<String, List<Consumer<Integer>>> addedConsumers = 
addConsumersUntilOwnerChanged(
-                    topic, initialOwnerName, messageKeyHash, selector
-            );
-            String currentOwnerName = addedConsumers.getKey();
-            consumers.addAll(addedConsumers.getValue());
-
-            // returning hash range to the initial owner
-            for (int i = consumers.size() - 1; i > -1 && 
!initialOwnerName.equals(currentOwnerName); i--) {
-                consumers.remove(i).close();
-                currentOwnerName = findOwnerName(selector, messageKeyHash);
-            }
-
-            // add new consumers until hash range of the initial message owner 
moved
-            Pair<String, List<Consumer<Integer>>> addedConsumersAfter = 
addConsumersUntilOwnerChanged(
-                    topic, initialOwnerName, messageKeyHash, selector
-            );
-
-            String currentOwnerNameAfter = addedConsumersAfter.getKey();
-            consumers.addAll(addedConsumersAfter.getValue());
-
-            // remove the initial owner
-            consumer.close();
-
-            // verify the message sent to new consumer which owns hash range
-            Optional<Consumer<Integer>> theNewOwnerMaybe = consumers
-                    .stream()
-                    .filter(c -> 
c.getConsumerName().equals(currentOwnerNameAfter))
-                    .findAny();
-
-            assertThat(theNewOwnerMaybe).isPresent();
-
-            Consumer<Integer> theNewOwner = theNewOwnerMaybe.get();
-
-            Message<Integer> message = theNewOwner.receive(5, 
TimeUnit.SECONDS);
-            assertThat(message).describedAs("Message with key " + messageKey
-                    + " expected to be delivered to consumer " + 
theNewOwner.getConsumerName()).isNotNull();
-            assertThat(message.getKey()).isEqualTo(messageKey);
-        } finally {
-            for (Consumer<Integer> c : consumers) {
-                c.close();
-            }
-        }
-    }
-
-    private Pair<String, List<Consumer<Integer>>> 
addConsumersUntilOwnerChanged(
-            String topic,
-            String initialOwnerName,
-            int messageKeyHash,
-            StickyKeyConsumerSelector selector
-    ) throws PulsarClientException {
-        String currentOwnerName;
-        List<Consumer<Integer>> consumers = new ArrayList<>();
-        do {
-            Consumer<Integer> addedC = createConsumer(topic);
-            consumers.add(addedC);
-            currentOwnerName = findOwnerName(selector, messageKeyHash);
-        } while (initialOwnerName.equals(currentOwnerName));
-        return Pair.of(currentOwnerName, consumers);
-    }
-
-    private String findOwnerName(StickyKeyConsumerSelector selector, int hash) 
{
-        return selector.getConsumerKeyHashRanges().entrySet().stream()
-                .filter(entry ->
-                        entry.getValue().stream().anyMatch(range -> 
range.contains(hash))
-                )
-                .findAny().orElseThrow(() -> new IllegalArgumentException("No 
owner for the hash " + hash))
-                .getKey().consumerName();
-    }
-
     @Test(dataProvider = "currentImplementationType")
     public void testAttachKeyToMessageMetadata(KeySharedImplementationType 
impl) throws PulsarClientException {
         String topic = "persistent://public/default/key_shared-" + 
UUID.randomUUID();

Reply via email to