This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new ca8d33835db [fix][broker] Key_Shared subscription doesn't always
deliver messages from the replay queue after a consumer disconnects and leaves
a backlog (#24732)
ca8d33835db is described below
commit ca8d33835dba2b6200c68838af13a8cced41282b
Author: nborisov <[email protected]>
AuthorDate: Fri Sep 12 12:30:49 2025 +0300
[fix][broker] Key_Shared subscription doesn't always deliver messages from
the replay queue after a consumer disconnects and leaves a backlog (#24732)
Co-authored-by: Nikolai Borisov <[email protected]>
---
.../service/ConsumerHashAssignmentsSnapshot.java | 44 ++++---
.../broker/service/DrainingHashesTracker.java | 8 ++
.../broker/service/ImpactedConsumersResult.java | 32 +++--
...movedHashRanges.java => UpdatedHashRanges.java} | 8 +-
...istentStickyKeyDispatcherMultipleConsumers.java | 17 ++-
...istentHashingStickyKeyConsumerSelectorTest.java | 39 ++++--
.../ConsumerHashAssignmentsSnapshotTest.java | 49 +++++---
.../client/api/KeySharedSubscriptionTest.java | 131 +++++++++++++++++++++
8 files changed, 266 insertions(+), 62 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshot.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshot.java
index 18ea927c642..0607711826b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshot.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshot.java
@@ -72,7 +72,7 @@ public class ConsumerHashAssignmentsSnapshot {
}
public ImpactedConsumersResult
resolveImpactedConsumers(ConsumerHashAssignmentsSnapshot assignmentsAfter) {
- return resolveConsumerRemovedHashRanges(this.hashRangeAssignments,
assignmentsAfter.hashRangeAssignments);
+ return resolveConsumerUpdatedHashRanges(this.hashRangeAssignments,
assignmentsAfter.hashRangeAssignments);
}
/**
@@ -111,28 +111,36 @@ public class ConsumerHashAssignmentsSnapshot {
* @param mappingAfter the range mapping after the change
* @return consumers and ranges where the existing range changed
*/
- static ImpactedConsumersResult
resolveConsumerRemovedHashRanges(List<HashRangeAssignment> mappingBefore,
+ static ImpactedConsumersResult
resolveConsumerUpdatedHashRanges(List<HashRangeAssignment> mappingBefore,
List<HashRangeAssignment> mappingAfter) {
Map<Range, Pair<Consumer, Consumer>> impactedRanges =
diffRanges(mappingBefore, mappingAfter);
- Map<Consumer, SortedSet<Range>> removedRangesByConsumer =
impactedRanges.entrySet().stream()
- .collect(IdentityHashMap::new, (resultMap, entry) -> {
- Range range = entry.getKey();
- // filter out only where the range was removed
- Consumer consumerBefore = entry.getValue().getLeft();
- if (consumerBefore != null) {
- resultMap.computeIfAbsent(consumerBefore, k -> new
TreeSet<>()).add(range);
- }
- }, IdentityHashMap::putAll);
- return
mergedOverlappingRangesAndConvertToImpactedConsumersResult(removedRangesByConsumer);
+ Map<Consumer, SortedSet<Range>> addedRangesByConsumer = new
IdentityHashMap<>();
+ Map<Consumer, SortedSet<Range>> removedRangesByConsumer = new
IdentityHashMap<>();
+ impactedRanges.forEach((range, value) -> {
+ Consumer consumerAfter = value.getRight();
+
+ // last consumer was removed
+ if (consumerAfter != null) {
+ addedRangesByConsumer.computeIfAbsent(consumerAfter, k -> new
TreeSet<>()).add(range);
+ }
+ // filter out only where the range was removed
+ Consumer consumerBefore = value.getLeft();
+ if (consumerBefore != null) {
+ removedRangesByConsumer.computeIfAbsent(consumerBefore, k ->
new TreeSet<>()).add(range);
+ }
+ });
+ var removedMerged =
mergedOverlappingRangesAndConvertToImpactedConsumersResult(removedRangesByConsumer);
+ var addedMerged =
mergedOverlappingRangesAndConvertToImpactedConsumersResult(addedRangesByConsumer);
+ return ImpactedConsumersResult.of(removedMerged, addedMerged);
}
- static ImpactedConsumersResult
mergedOverlappingRangesAndConvertToImpactedConsumersResult(
- Map<Consumer, SortedSet<Range>> removedRangesByConsumer) {
- Map<Consumer, RemovedHashRanges> mergedRangesByConsumer = new
IdentityHashMap<>();
- removedRangesByConsumer.forEach((consumer, ranges) -> {
- mergedRangesByConsumer.put(consumer,
RemovedHashRanges.of(mergeOverlappingRanges(ranges)));
+ static Map<Consumer, UpdatedHashRanges>
mergedOverlappingRangesAndConvertToImpactedConsumersResult(
+ Map<Consumer, SortedSet<Range>> updatedRangesByConsumer) {
+ Map<Consumer, UpdatedHashRanges> mergedRangesByConsumer = new
IdentityHashMap<>();
+ updatedRangesByConsumer.forEach((consumer, ranges) -> {
+ mergedRangesByConsumer.put(consumer,
UpdatedHashRanges.of(mergeOverlappingRanges(ranges)));
});
- return ImpactedConsumersResult.of(mergedRangesByConsumer);
+ return mergedRangesByConsumer;
}
/**
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java
index 9bc5c5f1e44..51c45817368 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java
@@ -400,6 +400,14 @@ public class DrainingHashesTracker {
} finally {
lock.writeLock().unlock();
}
+
+ // update the consumer specific stats
+ ConsumerDrainingHashesStats drainingHashesStats =
+ consumerDrainingHashesStatsMap.get(new
ConsumerIdentityWrapper(consumer));
+ if (drainingHashesStats != null) {
+ drainingHashesStats.clearHash(stickyKeyHash);
+ }
+
return false;
}
// increment the blocked count which is used to determine if the hash
is blocking
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ImpactedConsumersResult.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ImpactedConsumersResult.java
index a525b0501d7..bb888db1ca3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ImpactedConsumersResult.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ImpactedConsumersResult.java
@@ -30,22 +30,27 @@ import lombok.ToString;
@EqualsAndHashCode
@ToString
public class ImpactedConsumersResult {
- public interface RemovedHashRangesProcessor {
- void process(Consumer consumer, RemovedHashRanges removedHashRanges);
+ public interface UpdatedHashRangesProcessor {
+ void process(Consumer consumer, UpdatedHashRanges updatedHashRanges,
OperationType operationType);
}
- private final Map<Consumer, RemovedHashRanges> removedHashRanges;
+ private final Map<Consumer, UpdatedHashRanges> removedHashRanges;
+ private final Map<Consumer, UpdatedHashRanges> addedHashRanges;
- private ImpactedConsumersResult(Map<Consumer, RemovedHashRanges>
removedHashRanges) {
+ private ImpactedConsumersResult(Map<Consumer, UpdatedHashRanges>
removedHashRanges,
+ Map<Consumer, UpdatedHashRanges>
addedHashRanges) {
this.removedHashRanges = removedHashRanges;
+ this.addedHashRanges = addedHashRanges;
}
- public static ImpactedConsumersResult of(Map<Consumer, RemovedHashRanges>
removedHashRanges) {
- return new ImpactedConsumersResult(removedHashRanges);
+ public static ImpactedConsumersResult of(Map<Consumer, UpdatedHashRanges>
removedHashRanges,
+ Map<Consumer, UpdatedHashRanges>
addedHashRanges) {
+ return new ImpactedConsumersResult(removedHashRanges, addedHashRanges);
}
- public void processRemovedHashRanges(RemovedHashRangesProcessor processor)
{
- removedHashRanges.forEach((c, r) -> processor.process(c, r));
+ public void processUpdatedHashRanges(UpdatedHashRangesProcessor processor)
{
+ removedHashRanges.forEach((c, r) -> processor.process(c, r,
OperationType.REMOVE));
+ addedHashRanges.forEach((c, r) -> processor.process(c, r,
OperationType.ADD));
}
public boolean isEmpty() {
@@ -53,7 +58,16 @@ public class ImpactedConsumersResult {
}
@VisibleForTesting
- Map<Consumer, RemovedHashRanges> getRemovedHashRanges() {
+ Map<Consumer, UpdatedHashRanges> getRemovedHashRanges() {
return removedHashRanges;
}
+
+ @VisibleForTesting
+ Map<Consumer, UpdatedHashRanges> getAddedHashRanges() {
+ return addedHashRanges;
+ }
+
+ public enum OperationType {
+ ADD, REMOVE
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RemovedHashRanges.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/UpdatedHashRanges.java
similarity index 94%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RemovedHashRanges.java
rename to
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/UpdatedHashRanges.java
index d9bd502255b..e6f3e6709cf 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RemovedHashRanges.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/UpdatedHashRanges.java
@@ -29,10 +29,10 @@ import org.apache.pulsar.client.api.Range;
*/
@EqualsAndHashCode
@ToString
-public class RemovedHashRanges {
+public class UpdatedHashRanges {
private final Range[] sortedRanges;
- private RemovedHashRanges(List<Range> ranges) {
+ private UpdatedHashRanges(List<Range> ranges) {
// Converts the set of ranges to an array to avoid iterator allocation
// when the ranges are iterator multiple times in the pending
acknowledgments loop.
this.sortedRanges = ranges.toArray(new Range[0]);
@@ -52,8 +52,8 @@ public class RemovedHashRanges {
}
}
- public static RemovedHashRanges of(List<Range> ranges) {
- return new RemovedHashRanges(ranges);
+ public static UpdatedHashRanges of(List<Range> ranges) {
+ return new UpdatedHashRanges(ranges);
}
/**
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 9e92a2ab40d..d803bc7d963 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -170,16 +170,25 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
private synchronized void registerDrainingHashes(Consumer skipConsumer,
ImpactedConsumersResult
impactedConsumers) {
- impactedConsumers.processRemovedHashRanges((c, removedHashRanges) -> {
+ impactedConsumers.processUpdatedHashRanges((c, updatedHashRanges,
opType) -> {
if (c != skipConsumer) {
c.getPendingAcks().forEach((ledgerId, entryId, batchSize,
stickyKeyHash) -> {
if (stickyKeyHash == STICKY_KEY_HASH_NOT_SET) {
log.warn("[{}] Sticky key hash was missing for {}:{}",
getName(), ledgerId, entryId);
return;
}
- if (removedHashRanges.containsStickyKey(stickyKeyHash)) {
- // add the pending ack to the draining hashes tracker
if the hash is in the range
- drainingHashesTracker.addEntry(c, stickyKeyHash);
+ if (updatedHashRanges.containsStickyKey(stickyKeyHash)) {
+ switch (opType) {
+ //reduce ref count in case the stickyKeyHash was
re-assigned to the original consumer
+ case ADD -> {
+ var entry =
drainingHashesTracker.getEntry(stickyKeyHash);
+ if (entry != null && entry.getConsumer() == c)
{
+ drainingHashesTracker.reduceRefCount(c,
stickyKeyHash, false);
+ }
+ }
+ // add the pending ack to the draining hashes
tracker if the hash is in the range
+ case REMOVE -> drainingHashesTracker.addEntry(c,
stickyKeyHash);
+ }
}
});
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
index 82891bfe9c4..d02b9ca27a2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
@@ -36,7 +36,6 @@ import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.mutable.MutableInt;
-import
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
import org.apache.pulsar.client.api.Range;
import org.assertj.core.data.Offset;
import org.mockito.Mockito;
@@ -47,7 +46,7 @@ import org.testng.annotations.Test;
public class ConsistentHashingStickyKeyConsumerSelectorTest {
@Test
- public void testConsumerSelect() throws ConsumerAssignException {
+ public void testConsumerSelect() {
ConsistentHashingStickyKeyConsumerSelector selector = new
ConsistentHashingStickyKeyConsumerSelector(200);
String key1 = "anyKey";
@@ -151,7 +150,7 @@ public class ConsistentHashingStickyKeyConsumerSelectorTest
{
@Test
- public void testGetConsumerKeyHashRanges() throws
BrokerServiceException.ConsumerAssignException {
+ public void testGetConsumerKeyHashRanges() {
ConsistentHashingStickyKeyConsumerSelector selector = new
ConsistentHashingStickyKeyConsumerSelector(3);
List<String> consumerName = Arrays.asList("consumer1", "consumer2",
"consumer3");
List<Consumer> consumers = new ArrayList<>();
@@ -201,8 +200,7 @@ public class ConsistentHashingStickyKeyConsumerSelectorTest
{
}
@Test
- public void testConsumersGetSufficientlyAccuratelyEvenlyMapped()
- throws BrokerServiceException.ConsumerAssignException {
+ public void testConsumersGetSufficientlyAccuratelyEvenlyMapped() {
ConsistentHashingStickyKeyConsumerSelector selector = new
ConsistentHashingStickyKeyConsumerSelector(200);
List<Consumer> consumers = new ArrayList<>();
for (int i = 0; i < 20; i++) {
@@ -530,11 +528,24 @@ public class
ConsistentHashingStickyKeyConsumerSelectorTest {
selector.removeConsumer(consumer);
ConsumerHashAssignmentsSnapshot assignmentsAfter =
selector.getConsumerHashAssignmentsSnapshot();
-
assertThat(assignmentsBefore.resolveImpactedConsumers(assignmentsAfter).getRemovedHashRanges())
+ ImpactedConsumersResult impactedConsumersAfterRemoval =
assignmentsBefore
+ .resolveImpactedConsumers(assignmentsAfter);
+ assertThat(impactedConsumersAfterRemoval.getRemovedHashRanges())
.describedAs(
"when a consumer is removed, the removed hash
ranges should only be from "
+ "the removed consumer")
.containsOnlyKeys(consumer);
+ List<Range> allAddedRangesAfterRemoval =
ConsumerHashAssignmentsSnapshot.mergeOverlappingRanges(
+
impactedConsumersAfterRemoval.getAddedHashRanges().values().stream()
+
.map(UpdatedHashRanges::asRanges).flatMap(List::stream)
+ .collect(Collectors.toCollection(TreeSet::new))
+ );
+ assertThat(allAddedRangesAfterRemoval)
+ .describedAs(
+ "when a consumer is removed, all its hash ranges
should appear "
+ + "in added hash ranges"
+ )
+
.containsExactlyElementsOf(assignmentsBefore.getRangesByConsumer().get(consumer));
assignmentsBefore = assignmentsAfter;
// add consumer back
@@ -543,8 +554,9 @@ public class ConsistentHashingStickyKeyConsumerSelectorTest
{
assignmentsAfter = selector.getConsumerHashAssignmentsSnapshot();
List<Range> addedConsumerRanges =
assignmentsAfter.getRangesByConsumer().get(consumer);
- Map<Consumer, RemovedHashRanges> removedHashRanges =
-
assignmentsBefore.resolveImpactedConsumers(assignmentsAfter).getRemovedHashRanges();
+ ImpactedConsumersResult impactedConsumersAfterAdding =
assignmentsBefore
+ .resolveImpactedConsumers(assignmentsAfter);
+ Map<Consumer, UpdatedHashRanges> removedHashRanges =
impactedConsumersAfterAdding.getRemovedHashRanges();
ConsumerHashAssignmentsSnapshot finalAssignmentsBefore =
assignmentsBefore;
assertThat(removedHashRanges).allSatisfy((c, removedHashRange) -> {
assertThat(removedHashRange
@@ -558,12 +570,19 @@ public class
ConsistentHashingStickyKeyConsumerSelectorTest {
List<Range> allRemovedRanges =
ConsumerHashAssignmentsSnapshot.mergeOverlappingRanges(
-
removedHashRanges.entrySet().stream().map(Map.Entry::getValue)
- .map(RemovedHashRanges::asRanges)
+ removedHashRanges.values().stream()
+ .map(UpdatedHashRanges::asRanges)
.flatMap(List::stream).collect(Collectors.toCollection(TreeSet::new)));
assertThat(allRemovedRanges)
.describedAs("all removed ranges should be the same as the
ranges of the added consumer")
.containsExactlyElementsOf(addedConsumerRanges);
+ List<Range> allAddedRangesAfterAdding =
ConsumerHashAssignmentsSnapshot.mergeOverlappingRanges(
+
impactedConsumersAfterAdding.getAddedHashRanges().values().stream()
+ .map(UpdatedHashRanges::asRanges)
+
.flatMap(List::stream).collect(Collectors.toCollection(TreeSet::new)));
+ assertThat(addedConsumerRanges)
+ .describedAs("all added ranges should be the same as the
ranges of the added consumer")
+ .containsExactlyElementsOf(allAddedRangesAfterAdding);
assignmentsBefore = assignmentsAfter;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshotTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshotTest.java
index 5c886b6eec9..93e53aba7b3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshotTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshotTest.java
@@ -129,14 +129,16 @@ public class ConsumerHashAssignmentsSnapshotTest {
List<HashRangeAssignment> mappingBefore = new ArrayList<>();
List<HashRangeAssignment> mappingAfter = new ArrayList<>();
+ Range notModifiedRange = Range.of(1, 5);
Consumer consumer1 = createMockConsumer("consumer1");
- mappingBefore.add(new HashRangeAssignment(Range.of(1, 5), consumer1));
- mappingAfter.add(new HashRangeAssignment(Range.of(1, 5), consumer1));
+ mappingBefore.add(new HashRangeAssignment(notModifiedRange,
consumer1));
+ mappingAfter.add(new HashRangeAssignment(notModifiedRange, consumer1));
ImpactedConsumersResult impactedConsumers =
-
ConsumerHashAssignmentsSnapshot.resolveConsumerRemovedHashRanges(mappingBefore,
mappingAfter);
+
ConsumerHashAssignmentsSnapshot.resolveConsumerUpdatedHashRanges(mappingBefore,
mappingAfter);
assertThat(impactedConsumers.getRemovedHashRanges()).isEmpty();
+ assertThat(impactedConsumers.getAddedHashRanges()).isEmpty();
}
@Test
@@ -144,49 +146,59 @@ public class ConsumerHashAssignmentsSnapshotTest {
List<HashRangeAssignment> mappingBefore = new ArrayList<>();
List<HashRangeAssignment> mappingAfter = new ArrayList<>();
+ Range reAssignedRange = Range.of(1, 5);
Consumer consumer1 = createMockConsumer("consumer1");
Consumer consumer2 = createMockConsumer("consumer2");
- mappingBefore.add(new HashRangeAssignment(Range.of(1, 5), consumer1));
- mappingAfter.add(new HashRangeAssignment(Range.of(1, 5), consumer2));
+ mappingBefore.add(new HashRangeAssignment(reAssignedRange, consumer1));
+ mappingAfter.add(new HashRangeAssignment(reAssignedRange, consumer2));
ImpactedConsumersResult impactedConsumers =
-
ConsumerHashAssignmentsSnapshot.resolveConsumerRemovedHashRanges(mappingBefore,
mappingAfter);
+
ConsumerHashAssignmentsSnapshot.resolveConsumerUpdatedHashRanges(mappingBefore,
mappingAfter);
assertThat(impactedConsumers.getRemovedHashRanges()).containsExactlyInAnyOrderEntriesOf(
- Map.of(consumer1, RemovedHashRanges.of(List.of(Range.of(1,
5)))));
+ Map.of(consumer1,
UpdatedHashRanges.of(List.of(reAssignedRange))));
+
assertThat(impactedConsumers.getAddedHashRanges()).containsExactlyInAnyOrderEntriesOf(
+ Map.of(consumer2,
UpdatedHashRanges.of(List.of(reAssignedRange)))
+ );
}
@Test
- public void testResolveConsumerRemovedHashRanges_RangeAdded() {
+ public void testResolveConsumerUpdatedHashRanges_RangeAdded() {
List<HashRangeAssignment> mappingBefore = new ArrayList<>();
List<HashRangeAssignment> mappingAfter = new ArrayList<>();
+ Range addedRange = Range.of(1, 5);
Consumer consumer1 = createMockConsumer("consumer1");
- mappingAfter.add(new HashRangeAssignment(Range.of(1, 5), consumer1));
+ mappingAfter.add(new HashRangeAssignment(addedRange, consumer1));
ImpactedConsumersResult impactedConsumers =
-
ConsumerHashAssignmentsSnapshot.resolveConsumerRemovedHashRanges(mappingBefore,
mappingAfter);
+
ConsumerHashAssignmentsSnapshot.resolveConsumerUpdatedHashRanges(mappingBefore,
mappingAfter);
assertThat(impactedConsumers.getRemovedHashRanges()).isEmpty();
+
assertThat(impactedConsumers.getAddedHashRanges()).containsExactlyInAnyOrderEntriesOf(
+ Map.of(consumer1, UpdatedHashRanges.of(List.of(addedRange)))
+ );
}
@Test
- public void testResolveConsumerRemovedHashRanges_RangeRemoved() {
+ public void testResolveConsumerRemovedHashRanges_RangeUpdated() {
List<HashRangeAssignment> mappingBefore = new ArrayList<>();
List<HashRangeAssignment> mappingAfter = new ArrayList<>();
+ Range removedRange = Range.of(1, 5);
Consumer consumer1 = createMockConsumer("consumer1");
- mappingBefore.add(new HashRangeAssignment(Range.of(1, 5), consumer1));
+ mappingBefore.add(new HashRangeAssignment(removedRange, consumer1));
ImpactedConsumersResult impactedConsumers =
-
ConsumerHashAssignmentsSnapshot.resolveConsumerRemovedHashRanges(mappingBefore,
mappingAfter);
+
ConsumerHashAssignmentsSnapshot.resolveConsumerUpdatedHashRanges(mappingBefore,
mappingAfter);
assertThat(impactedConsumers.getRemovedHashRanges()).containsExactlyInAnyOrderEntriesOf(
- Map.of(consumer1, RemovedHashRanges.of(List.of(Range.of(1,
5)))));
+ Map.of(consumer1,
UpdatedHashRanges.of(List.of(removedRange))));
+ assertThat(impactedConsumers.getAddedHashRanges()).isEmpty();
}
@Test
- public void testResolveConsumerRemovedHashRanges_OverlappingRanges() {
+ public void testResolveConsumerUpdatedHashRanges_OverlappingRanges() {
List<HashRangeAssignment> mappingBefore = new ArrayList<>();
List<HashRangeAssignment> mappingAfter = new ArrayList<>();
@@ -196,9 +208,12 @@ public class ConsumerHashAssignmentsSnapshotTest {
mappingAfter.add(new HashRangeAssignment(Range.of(3, 7), consumer2));
ImpactedConsumersResult impactedConsumers =
-
ConsumerHashAssignmentsSnapshot.resolveConsumerRemovedHashRanges(mappingBefore,
mappingAfter);
+
ConsumerHashAssignmentsSnapshot.resolveConsumerUpdatedHashRanges(mappingBefore,
mappingAfter);
assertThat(impactedConsumers.getRemovedHashRanges()).containsExactlyInAnyOrderEntriesOf(
- Map.of(consumer1, RemovedHashRanges.of(List.of(Range.of(3,
5)))));
+ Map.of(consumer1, UpdatedHashRanges.of(List.of(Range.of(3,
5)))));
+
assertThat(impactedConsumers.getAddedHashRanges()).containsExactlyInAnyOrderEntriesOf(
+ Map.of(consumer2, UpdatedHashRanges.of(List.of(Range.of(3,
7))))
+ );
}
}
\ No newline at end of file
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index af7c32192c7..80b5070077d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -30,6 +30,7 @@ import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
@@ -919,6 +920,136 @@ public class KeySharedSubscriptionTest extends
ProducerConsumerBase {
consumer.close();
}
+ /*
+ Verifies https://github.com/apache/pulsar/issues/23845 issue fixed
+
+ The scenario of verification
+
+ Preconditions:
+
+ 1. Consumer "consumer" up and running
+ 2. Producer "producer" connected to broker
+
+ Steps to reproduce:
+ 1. "producer" send message with key "testMessageKey"
+ Example: "testMessage" hash is always 124
+
+ 2. "consumer" receives BUT DO NOT acknowledge message with key
"testMessageKey"
+ Example: "consumer" hash range: [0-256]. "testMessage" hash range:
[0-256]
+
+ 3. Add more consumers until hash range of message with key
"testMessageKey" removed from "consumer" hash ranges
+ Example: "consumer" hash range: [0-123]. "consumer N" hash range:
[124-136]. "testMessage" hash range: [124-136]
+
+ 4. Stop the added consumers until message key "testMessageKey" hash
range moved to "consumer" hash ranges
+ Example: "consumer" hash range: [0-128]. "testMessage" hash range:
[0-128]
+
+ 5. Add more consumers until hash range of message with key
"testMessageKey" removed from "consumer" hash ranges
+ Example: "consumer" hash range: [0-96]. "consumer N" hash range:
[121-154]. "testMessage" hash range: [121-154]
+
+ 6. Stop "consumer"
+
+ Actual result for bug:
+ No message with key "testMessageKey" received by consumer who owns
message's hash range
+
+ Expected result:
+ The message with key "testMessageKey" received by consumer who owns
message's hash range
+ Example: "consumer N" with hash range [121-154] received the message
+
+ */
+ @Test
+ void testMessageDeliveredFromDrainingHashes() throws PulsarClientException
{
+ String messageKey = "testMessageKey";
+ String topic = "testMessageDeliveredFromDrainingHashes" +
UUID.randomUUID();
+
+ List<Consumer<Integer>> consumers = new ArrayList<>();
+
+ @Cleanup
+ Consumer<Integer> consumer = createConsumer(topic);
+ String initialOwnerName = consumer.getConsumerName();
+
+ @Cleanup
+ Producer<Integer> producer = createProducer(topic, false);
+ producer.newMessage().key(messageKey).value(1).send();
+
+ // receive but not acknowledge the message to leave it in pending acks
+ Message<Integer> received = consumer.receive();
+ assertThat(received.getKey()).isEqualTo(messageKey);
+
+ StickyKeyDispatcher dispatcher = getDispatcher(topic,
SUBSCRIPTION_NAME);
+ StickyKeyConsumerSelector selector = dispatcher.getSelector();
+ int messageKeyHash =
selector.makeStickyKeyHash(messageKey.getBytes(StandardCharsets.UTF_8));
+
+ try {
+ // add new consumers until hash range of the initial message owner
moved
+ Pair<String, List<Consumer<Integer>>> addedConsumers =
addConsumersUntilOwnerChanged(
+ topic, initialOwnerName, messageKeyHash, selector
+ );
+ String currentOwnerName = addedConsumers.getKey();
+ consumers.addAll(addedConsumers.getValue());
+
+ // returning hash range to the initial owner
+ for (int i = consumers.size() - 1; i > -1 &&
!initialOwnerName.equals(currentOwnerName); i--) {
+ consumers.remove(i).close();
+ currentOwnerName = findOwnerName(selector, messageKeyHash);
+ }
+
+ // add new consumers until hash range of the initial message owner
moved
+ Pair<String, List<Consumer<Integer>>> addedConsumersAfter =
addConsumersUntilOwnerChanged(
+ topic, initialOwnerName, messageKeyHash, selector
+ );
+
+ String currentOwnerNameAfter = addedConsumersAfter.getKey();
+ consumers.addAll(addedConsumersAfter.getValue());
+
+ // remove the initial owner
+ consumer.close();
+
+ // verify the message sent to new consumer which owns hash range
+ Optional<Consumer<Integer>> theNewOwnerMaybe = consumers
+ .stream()
+ .filter(c ->
c.getConsumerName().equals(currentOwnerNameAfter))
+ .findAny();
+
+ assertThat(theNewOwnerMaybe).isPresent();
+
+ Consumer<Integer> theNewOwner = theNewOwnerMaybe.get();
+
+ Message<Integer> message = theNewOwner.receive(5,
TimeUnit.SECONDS);
+ assertThat(message).describedAs("Message with key " + messageKey
+ + " expected to be delivered to consumer " +
theNewOwner.getConsumerName()).isNotNull();
+ assertThat(message.getKey()).isEqualTo(messageKey);
+ } finally {
+ for (Consumer<Integer> c : consumers) {
+ c.close();
+ }
+ }
+ }
+
+ private Pair<String, List<Consumer<Integer>>>
addConsumersUntilOwnerChanged(
+ String topic,
+ String initialOwnerName,
+ int messageKeyHash,
+ StickyKeyConsumerSelector selector
+ ) throws PulsarClientException {
+ String currentOwnerName;
+ List<Consumer<Integer>> consumers = new ArrayList<>();
+ do {
+ Consumer<Integer> addedC = createConsumer(topic);
+ consumers.add(addedC);
+ currentOwnerName = findOwnerName(selector, messageKeyHash);
+ } while (initialOwnerName.equals(currentOwnerName));
+ return Pair.of(currentOwnerName, consumers);
+ }
+
+ private String findOwnerName(StickyKeyConsumerSelector selector, int hash)
{
+ return selector.getConsumerKeyHashRanges().entrySet().stream()
+ .filter(entry ->
+ entry.getValue().stream().anyMatch(range ->
range.contains(hash))
+ )
+ .findAny().orElseThrow(() -> new IllegalArgumentException("No
owner for the hash " + hash))
+ .getKey().consumerName();
+ }
+
@Test(dataProvider = "currentImplementationType")
public void testAttachKeyToMessageMetadata(KeySharedImplementationType
impl) throws PulsarClientException {
String topic = "persistent://public/default/key_shared-" +
UUID.randomUUID();