This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 3465d2a1064 Revert "[fix][broker] Key_Shared subscription doesn't
always deliver messages from the replay queue after a consumer disconnects and
leaves a backlog" (#24735)
3465d2a1064 is described below
commit 3465d2a1064b3c9a02c8475d368f33c3f9d4b847
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Sep 12 12:34:23 2025 +0300
Revert "[fix][broker] Key_Shared subscription doesn't always deliver
messages from the replay queue after a consumer disconnects and leaves a
backlog" (#24735)
the PR should target master branch.
---
.../service/ConsumerHashAssignmentsSnapshot.java | 44 +++----
.../broker/service/DrainingHashesTracker.java | 8 --
.../broker/service/ImpactedConsumersResult.java | 32 ++---
...datedHashRanges.java => RemovedHashRanges.java} | 8 +-
...istentStickyKeyDispatcherMultipleConsumers.java | 17 +--
...istentHashingStickyKeyConsumerSelectorTest.java | 39 ++----
.../ConsumerHashAssignmentsSnapshotTest.java | 49 +++-----
.../client/api/KeySharedSubscriptionTest.java | 131 ---------------------
8 files changed, 62 insertions(+), 266 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshot.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshot.java
index 0607711826b..18ea927c642 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshot.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshot.java
@@ -72,7 +72,7 @@ public class ConsumerHashAssignmentsSnapshot {
}
public ImpactedConsumersResult
resolveImpactedConsumers(ConsumerHashAssignmentsSnapshot assignmentsAfter) {
- return resolveConsumerUpdatedHashRanges(this.hashRangeAssignments,
assignmentsAfter.hashRangeAssignments);
+ return resolveConsumerRemovedHashRanges(this.hashRangeAssignments,
assignmentsAfter.hashRangeAssignments);
}
/**
@@ -111,36 +111,28 @@ public class ConsumerHashAssignmentsSnapshot {
* @param mappingAfter the range mapping after the change
* @return consumers and ranges where the existing range changed
*/
- static ImpactedConsumersResult
resolveConsumerUpdatedHashRanges(List<HashRangeAssignment> mappingBefore,
+ static ImpactedConsumersResult
resolveConsumerRemovedHashRanges(List<HashRangeAssignment> mappingBefore,
List<HashRangeAssignment> mappingAfter) {
Map<Range, Pair<Consumer, Consumer>> impactedRanges =
diffRanges(mappingBefore, mappingAfter);
- Map<Consumer, SortedSet<Range>> addedRangesByConsumer = new
IdentityHashMap<>();
- Map<Consumer, SortedSet<Range>> removedRangesByConsumer = new
IdentityHashMap<>();
- impactedRanges.forEach((range, value) -> {
- Consumer consumerAfter = value.getRight();
-
- // last consumer was removed
- if (consumerAfter != null) {
- addedRangesByConsumer.computeIfAbsent(consumerAfter, k -> new
TreeSet<>()).add(range);
- }
- // filter out only where the range was removed
- Consumer consumerBefore = value.getLeft();
- if (consumerBefore != null) {
- removedRangesByConsumer.computeIfAbsent(consumerBefore, k ->
new TreeSet<>()).add(range);
- }
- });
- var removedMerged =
mergedOverlappingRangesAndConvertToImpactedConsumersResult(removedRangesByConsumer);
- var addedMerged =
mergedOverlappingRangesAndConvertToImpactedConsumersResult(addedRangesByConsumer);
- return ImpactedConsumersResult.of(removedMerged, addedMerged);
+ Map<Consumer, SortedSet<Range>> removedRangesByConsumer =
impactedRanges.entrySet().stream()
+ .collect(IdentityHashMap::new, (resultMap, entry) -> {
+ Range range = entry.getKey();
+ // filter out only where the range was removed
+ Consumer consumerBefore = entry.getValue().getLeft();
+ if (consumerBefore != null) {
+ resultMap.computeIfAbsent(consumerBefore, k -> new
TreeSet<>()).add(range);
+ }
+ }, IdentityHashMap::putAll);
+ return
mergedOverlappingRangesAndConvertToImpactedConsumersResult(removedRangesByConsumer);
}
- static Map<Consumer, UpdatedHashRanges>
mergedOverlappingRangesAndConvertToImpactedConsumersResult(
- Map<Consumer, SortedSet<Range>> updatedRangesByConsumer) {
- Map<Consumer, UpdatedHashRanges> mergedRangesByConsumer = new
IdentityHashMap<>();
- updatedRangesByConsumer.forEach((consumer, ranges) -> {
- mergedRangesByConsumer.put(consumer,
UpdatedHashRanges.of(mergeOverlappingRanges(ranges)));
+ static ImpactedConsumersResult
mergedOverlappingRangesAndConvertToImpactedConsumersResult(
+ Map<Consumer, SortedSet<Range>> removedRangesByConsumer) {
+ Map<Consumer, RemovedHashRanges> mergedRangesByConsumer = new
IdentityHashMap<>();
+ removedRangesByConsumer.forEach((consumer, ranges) -> {
+ mergedRangesByConsumer.put(consumer,
RemovedHashRanges.of(mergeOverlappingRanges(ranges)));
});
- return mergedRangesByConsumer;
+ return ImpactedConsumersResult.of(mergedRangesByConsumer);
}
/**
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java
index 51c45817368..9bc5c5f1e44 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java
@@ -400,14 +400,6 @@ public class DrainingHashesTracker {
} finally {
lock.writeLock().unlock();
}
-
- // update the consumer specific stats
- ConsumerDrainingHashesStats drainingHashesStats =
- consumerDrainingHashesStatsMap.get(new
ConsumerIdentityWrapper(consumer));
- if (drainingHashesStats != null) {
- drainingHashesStats.clearHash(stickyKeyHash);
- }
-
return false;
}
// increment the blocked count which is used to determine if the hash
is blocking
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ImpactedConsumersResult.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ImpactedConsumersResult.java
index bb888db1ca3..a525b0501d7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ImpactedConsumersResult.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ImpactedConsumersResult.java
@@ -30,27 +30,22 @@ import lombok.ToString;
@EqualsAndHashCode
@ToString
public class ImpactedConsumersResult {
- public interface UpdatedHashRangesProcessor {
- void process(Consumer consumer, UpdatedHashRanges updatedHashRanges,
OperationType operationType);
+ public interface RemovedHashRangesProcessor {
+ void process(Consumer consumer, RemovedHashRanges removedHashRanges);
}
- private final Map<Consumer, UpdatedHashRanges> removedHashRanges;
- private final Map<Consumer, UpdatedHashRanges> addedHashRanges;
+ private final Map<Consumer, RemovedHashRanges> removedHashRanges;
- private ImpactedConsumersResult(Map<Consumer, UpdatedHashRanges>
removedHashRanges,
- Map<Consumer, UpdatedHashRanges>
addedHashRanges) {
+ private ImpactedConsumersResult(Map<Consumer, RemovedHashRanges>
removedHashRanges) {
this.removedHashRanges = removedHashRanges;
- this.addedHashRanges = addedHashRanges;
}
- public static ImpactedConsumersResult of(Map<Consumer, UpdatedHashRanges>
removedHashRanges,
- Map<Consumer, UpdatedHashRanges>
addedHashRanges) {
- return new ImpactedConsumersResult(removedHashRanges, addedHashRanges);
+ public static ImpactedConsumersResult of(Map<Consumer, RemovedHashRanges>
removedHashRanges) {
+ return new ImpactedConsumersResult(removedHashRanges);
}
- public void processUpdatedHashRanges(UpdatedHashRangesProcessor processor)
{
- removedHashRanges.forEach((c, r) -> processor.process(c, r,
OperationType.REMOVE));
- addedHashRanges.forEach((c, r) -> processor.process(c, r,
OperationType.ADD));
+ public void processRemovedHashRanges(RemovedHashRangesProcessor processor)
{
+ removedHashRanges.forEach((c, r) -> processor.process(c, r));
}
public boolean isEmpty() {
@@ -58,16 +53,7 @@ public class ImpactedConsumersResult {
}
@VisibleForTesting
- Map<Consumer, UpdatedHashRanges> getRemovedHashRanges() {
+ Map<Consumer, RemovedHashRanges> getRemovedHashRanges() {
return removedHashRanges;
}
-
- @VisibleForTesting
- Map<Consumer, UpdatedHashRanges> getAddedHashRanges() {
- return addedHashRanges;
- }
-
- public enum OperationType {
- ADD, REMOVE
- }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/UpdatedHashRanges.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RemovedHashRanges.java
similarity index 94%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/UpdatedHashRanges.java
rename to
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RemovedHashRanges.java
index e6f3e6709cf..d9bd502255b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/UpdatedHashRanges.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RemovedHashRanges.java
@@ -29,10 +29,10 @@ import org.apache.pulsar.client.api.Range;
*/
@EqualsAndHashCode
@ToString
-public class UpdatedHashRanges {
+public class RemovedHashRanges {
private final Range[] sortedRanges;
- private UpdatedHashRanges(List<Range> ranges) {
+ private RemovedHashRanges(List<Range> ranges) {
// Converts the set of ranges to an array to avoid iterator allocation
// when the ranges are iterator multiple times in the pending
acknowledgments loop.
this.sortedRanges = ranges.toArray(new Range[0]);
@@ -52,8 +52,8 @@ public class UpdatedHashRanges {
}
}
- public static UpdatedHashRanges of(List<Range> ranges) {
- return new UpdatedHashRanges(ranges);
+ public static RemovedHashRanges of(List<Range> ranges) {
+ return new RemovedHashRanges(ranges);
}
/**
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index d803bc7d963..9e92a2ab40d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -170,25 +170,16 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
private synchronized void registerDrainingHashes(Consumer skipConsumer,
ImpactedConsumersResult
impactedConsumers) {
- impactedConsumers.processUpdatedHashRanges((c, updatedHashRanges,
opType) -> {
+ impactedConsumers.processRemovedHashRanges((c, removedHashRanges) -> {
if (c != skipConsumer) {
c.getPendingAcks().forEach((ledgerId, entryId, batchSize,
stickyKeyHash) -> {
if (stickyKeyHash == STICKY_KEY_HASH_NOT_SET) {
log.warn("[{}] Sticky key hash was missing for {}:{}",
getName(), ledgerId, entryId);
return;
}
- if (updatedHashRanges.containsStickyKey(stickyKeyHash)) {
- switch (opType) {
- //reduce ref count in case the stickyKeyHash was
re-assigned to the original consumer
- case ADD -> {
- var entry =
drainingHashesTracker.getEntry(stickyKeyHash);
- if (entry != null && entry.getConsumer() == c)
{
- drainingHashesTracker.reduceRefCount(c,
stickyKeyHash, false);
- }
- }
- // add the pending ack to the draining hashes
tracker if the hash is in the range
- case REMOVE -> drainingHashesTracker.addEntry(c,
stickyKeyHash);
- }
+ if (removedHashRanges.containsStickyKey(stickyKeyHash)) {
+ // add the pending ack to the draining hashes tracker
if the hash is in the range
+ drainingHashesTracker.addEntry(c, stickyKeyHash);
}
});
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
index d02b9ca27a2..82891bfe9c4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
@@ -36,6 +36,7 @@ import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.mutable.MutableInt;
+import
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
import org.apache.pulsar.client.api.Range;
import org.assertj.core.data.Offset;
import org.mockito.Mockito;
@@ -46,7 +47,7 @@ import org.testng.annotations.Test;
public class ConsistentHashingStickyKeyConsumerSelectorTest {
@Test
- public void testConsumerSelect() {
+ public void testConsumerSelect() throws ConsumerAssignException {
ConsistentHashingStickyKeyConsumerSelector selector = new
ConsistentHashingStickyKeyConsumerSelector(200);
String key1 = "anyKey";
@@ -150,7 +151,7 @@ public class ConsistentHashingStickyKeyConsumerSelectorTest
{
@Test
- public void testGetConsumerKeyHashRanges() {
+ public void testGetConsumerKeyHashRanges() throws
BrokerServiceException.ConsumerAssignException {
ConsistentHashingStickyKeyConsumerSelector selector = new
ConsistentHashingStickyKeyConsumerSelector(3);
List<String> consumerName = Arrays.asList("consumer1", "consumer2",
"consumer3");
List<Consumer> consumers = new ArrayList<>();
@@ -200,7 +201,8 @@ public class ConsistentHashingStickyKeyConsumerSelectorTest
{
}
@Test
- public void testConsumersGetSufficientlyAccuratelyEvenlyMapped() {
+ public void testConsumersGetSufficientlyAccuratelyEvenlyMapped()
+ throws BrokerServiceException.ConsumerAssignException {
ConsistentHashingStickyKeyConsumerSelector selector = new
ConsistentHashingStickyKeyConsumerSelector(200);
List<Consumer> consumers = new ArrayList<>();
for (int i = 0; i < 20; i++) {
@@ -528,24 +530,11 @@ public class
ConsistentHashingStickyKeyConsumerSelectorTest {
selector.removeConsumer(consumer);
ConsumerHashAssignmentsSnapshot assignmentsAfter =
selector.getConsumerHashAssignmentsSnapshot();
- ImpactedConsumersResult impactedConsumersAfterRemoval =
assignmentsBefore
- .resolveImpactedConsumers(assignmentsAfter);
- assertThat(impactedConsumersAfterRemoval.getRemovedHashRanges())
+
assertThat(assignmentsBefore.resolveImpactedConsumers(assignmentsAfter).getRemovedHashRanges())
.describedAs(
"when a consumer is removed, the removed hash
ranges should only be from "
+ "the removed consumer")
.containsOnlyKeys(consumer);
- List<Range> allAddedRangesAfterRemoval =
ConsumerHashAssignmentsSnapshot.mergeOverlappingRanges(
-
impactedConsumersAfterRemoval.getAddedHashRanges().values().stream()
-
.map(UpdatedHashRanges::asRanges).flatMap(List::stream)
- .collect(Collectors.toCollection(TreeSet::new))
- );
- assertThat(allAddedRangesAfterRemoval)
- .describedAs(
- "when a consumer is removed, all its hash ranges
should appear "
- + "in added hash ranges"
- )
-
.containsExactlyElementsOf(assignmentsBefore.getRangesByConsumer().get(consumer));
assignmentsBefore = assignmentsAfter;
// add consumer back
@@ -554,9 +543,8 @@ public class ConsistentHashingStickyKeyConsumerSelectorTest
{
assignmentsAfter = selector.getConsumerHashAssignmentsSnapshot();
List<Range> addedConsumerRanges =
assignmentsAfter.getRangesByConsumer().get(consumer);
- ImpactedConsumersResult impactedConsumersAfterAdding =
assignmentsBefore
- .resolveImpactedConsumers(assignmentsAfter);
- Map<Consumer, UpdatedHashRanges> removedHashRanges =
impactedConsumersAfterAdding.getRemovedHashRanges();
+ Map<Consumer, RemovedHashRanges> removedHashRanges =
+
assignmentsBefore.resolveImpactedConsumers(assignmentsAfter).getRemovedHashRanges();
ConsumerHashAssignmentsSnapshot finalAssignmentsBefore =
assignmentsBefore;
assertThat(removedHashRanges).allSatisfy((c, removedHashRange) -> {
assertThat(removedHashRange
@@ -570,19 +558,12 @@ public class
ConsistentHashingStickyKeyConsumerSelectorTest {
List<Range> allRemovedRanges =
ConsumerHashAssignmentsSnapshot.mergeOverlappingRanges(
- removedHashRanges.values().stream()
- .map(UpdatedHashRanges::asRanges)
+
removedHashRanges.entrySet().stream().map(Map.Entry::getValue)
+ .map(RemovedHashRanges::asRanges)
.flatMap(List::stream).collect(Collectors.toCollection(TreeSet::new)));
assertThat(allRemovedRanges)
.describedAs("all removed ranges should be the same as the
ranges of the added consumer")
.containsExactlyElementsOf(addedConsumerRanges);
- List<Range> allAddedRangesAfterAdding =
ConsumerHashAssignmentsSnapshot.mergeOverlappingRanges(
-
impactedConsumersAfterAdding.getAddedHashRanges().values().stream()
- .map(UpdatedHashRanges::asRanges)
-
.flatMap(List::stream).collect(Collectors.toCollection(TreeSet::new)));
- assertThat(addedConsumerRanges)
- .describedAs("all added ranges should be the same as the
ranges of the added consumer")
- .containsExactlyElementsOf(allAddedRangesAfterAdding);
assignmentsBefore = assignmentsAfter;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshotTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshotTest.java
index 93e53aba7b3..5c886b6eec9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshotTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshotTest.java
@@ -129,16 +129,14 @@ public class ConsumerHashAssignmentsSnapshotTest {
List<HashRangeAssignment> mappingBefore = new ArrayList<>();
List<HashRangeAssignment> mappingAfter = new ArrayList<>();
- Range notModifiedRange = Range.of(1, 5);
Consumer consumer1 = createMockConsumer("consumer1");
- mappingBefore.add(new HashRangeAssignment(notModifiedRange,
consumer1));
- mappingAfter.add(new HashRangeAssignment(notModifiedRange, consumer1));
+ mappingBefore.add(new HashRangeAssignment(Range.of(1, 5), consumer1));
+ mappingAfter.add(new HashRangeAssignment(Range.of(1, 5), consumer1));
ImpactedConsumersResult impactedConsumers =
-
ConsumerHashAssignmentsSnapshot.resolveConsumerUpdatedHashRanges(mappingBefore,
mappingAfter);
+
ConsumerHashAssignmentsSnapshot.resolveConsumerRemovedHashRanges(mappingBefore,
mappingAfter);
assertThat(impactedConsumers.getRemovedHashRanges()).isEmpty();
- assertThat(impactedConsumers.getAddedHashRanges()).isEmpty();
}
@Test
@@ -146,59 +144,49 @@ public class ConsumerHashAssignmentsSnapshotTest {
List<HashRangeAssignment> mappingBefore = new ArrayList<>();
List<HashRangeAssignment> mappingAfter = new ArrayList<>();
- Range reAssignedRange = Range.of(1, 5);
Consumer consumer1 = createMockConsumer("consumer1");
Consumer consumer2 = createMockConsumer("consumer2");
- mappingBefore.add(new HashRangeAssignment(reAssignedRange, consumer1));
- mappingAfter.add(new HashRangeAssignment(reAssignedRange, consumer2));
+ mappingBefore.add(new HashRangeAssignment(Range.of(1, 5), consumer1));
+ mappingAfter.add(new HashRangeAssignment(Range.of(1, 5), consumer2));
ImpactedConsumersResult impactedConsumers =
-
ConsumerHashAssignmentsSnapshot.resolveConsumerUpdatedHashRanges(mappingBefore,
mappingAfter);
+
ConsumerHashAssignmentsSnapshot.resolveConsumerRemovedHashRanges(mappingBefore,
mappingAfter);
assertThat(impactedConsumers.getRemovedHashRanges()).containsExactlyInAnyOrderEntriesOf(
- Map.of(consumer1,
UpdatedHashRanges.of(List.of(reAssignedRange))));
-
assertThat(impactedConsumers.getAddedHashRanges()).containsExactlyInAnyOrderEntriesOf(
- Map.of(consumer2,
UpdatedHashRanges.of(List.of(reAssignedRange)))
- );
+ Map.of(consumer1, RemovedHashRanges.of(List.of(Range.of(1,
5)))));
}
@Test
- public void testResolveConsumerUpdatedHashRanges_RangeAdded() {
+ public void testResolveConsumerRemovedHashRanges_RangeAdded() {
List<HashRangeAssignment> mappingBefore = new ArrayList<>();
List<HashRangeAssignment> mappingAfter = new ArrayList<>();
- Range addedRange = Range.of(1, 5);
Consumer consumer1 = createMockConsumer("consumer1");
- mappingAfter.add(new HashRangeAssignment(addedRange, consumer1));
+ mappingAfter.add(new HashRangeAssignment(Range.of(1, 5), consumer1));
ImpactedConsumersResult impactedConsumers =
-
ConsumerHashAssignmentsSnapshot.resolveConsumerUpdatedHashRanges(mappingBefore,
mappingAfter);
+
ConsumerHashAssignmentsSnapshot.resolveConsumerRemovedHashRanges(mappingBefore,
mappingAfter);
assertThat(impactedConsumers.getRemovedHashRanges()).isEmpty();
-
assertThat(impactedConsumers.getAddedHashRanges()).containsExactlyInAnyOrderEntriesOf(
- Map.of(consumer1, UpdatedHashRanges.of(List.of(addedRange)))
- );
}
@Test
- public void testResolveConsumerRemovedHashRanges_RangeUpdated() {
+ public void testResolveConsumerRemovedHashRanges_RangeRemoved() {
List<HashRangeAssignment> mappingBefore = new ArrayList<>();
List<HashRangeAssignment> mappingAfter = new ArrayList<>();
- Range removedRange = Range.of(1, 5);
Consumer consumer1 = createMockConsumer("consumer1");
- mappingBefore.add(new HashRangeAssignment(removedRange, consumer1));
+ mappingBefore.add(new HashRangeAssignment(Range.of(1, 5), consumer1));
ImpactedConsumersResult impactedConsumers =
-
ConsumerHashAssignmentsSnapshot.resolveConsumerUpdatedHashRanges(mappingBefore,
mappingAfter);
+
ConsumerHashAssignmentsSnapshot.resolveConsumerRemovedHashRanges(mappingBefore,
mappingAfter);
assertThat(impactedConsumers.getRemovedHashRanges()).containsExactlyInAnyOrderEntriesOf(
- Map.of(consumer1,
UpdatedHashRanges.of(List.of(removedRange))));
- assertThat(impactedConsumers.getAddedHashRanges()).isEmpty();
+ Map.of(consumer1, RemovedHashRanges.of(List.of(Range.of(1,
5)))));
}
@Test
- public void testResolveConsumerUpdatedHashRanges_OverlappingRanges() {
+ public void testResolveConsumerRemovedHashRanges_OverlappingRanges() {
List<HashRangeAssignment> mappingBefore = new ArrayList<>();
List<HashRangeAssignment> mappingAfter = new ArrayList<>();
@@ -208,12 +196,9 @@ public class ConsumerHashAssignmentsSnapshotTest {
mappingAfter.add(new HashRangeAssignment(Range.of(3, 7), consumer2));
ImpactedConsumersResult impactedConsumers =
-
ConsumerHashAssignmentsSnapshot.resolveConsumerUpdatedHashRanges(mappingBefore,
mappingAfter);
+
ConsumerHashAssignmentsSnapshot.resolveConsumerRemovedHashRanges(mappingBefore,
mappingAfter);
assertThat(impactedConsumers.getRemovedHashRanges()).containsExactlyInAnyOrderEntriesOf(
- Map.of(consumer1, UpdatedHashRanges.of(List.of(Range.of(3,
5)))));
-
assertThat(impactedConsumers.getAddedHashRanges()).containsExactlyInAnyOrderEntriesOf(
- Map.of(consumer2, UpdatedHashRanges.of(List.of(Range.of(3,
7))))
- );
+ Map.of(consumer1, RemovedHashRanges.of(List.of(Range.of(3,
5)))));
}
}
\ No newline at end of file
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 80b5070077d..af7c32192c7 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -30,7 +30,6 @@ import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
@@ -920,136 +919,6 @@ public class KeySharedSubscriptionTest extends
ProducerConsumerBase {
consumer.close();
}
- /*
- Verifies https://github.com/apache/pulsar/issues/23845 issue fixed
-
- The scenario of verification
-
- Preconditions:
-
- 1. Consumer "consumer" up and running
- 2. Producer "producer" connected to broker
-
- Steps to reproduce:
- 1. "producer" send message with key "testMessageKey"
- Example: "testMessage" hash is always 124
-
- 2. "consumer" receives BUT DO NOT acknowledge message with key
"testMessageKey"
- Example: "consumer" hash range: [0-256]. "testMessage" hash range:
[0-256]
-
- 3. Add more consumers until hash range of message with key
"testMessageKey" removed from "consumer" hash ranges
- Example: "consumer" hash range: [0-123]. "consumer N" hash range:
[124-136]. "testMessage" hash range: [124-136]
-
- 4. Stop the added consumers until message key "testMessageKey" hash
range moved to "consumer" hash ranges
- Example: "consumer" hash range: [0-128]. "testMessage" hash range:
[0-128]
-
- 5. Add more consumers until hash range of message with key
"testMessageKey" removed from "consumer" hash ranges
- Example: "consumer" hash range: [0-96]. "consumer N" hash range:
[121-154]. "testMessage" hash range: [121-154]
-
- 6. Stop "consumer"
-
- Actual result for bug:
- No message with key "testMessageKey" received by consumer who owns
message's hash range
-
- Expected result:
- The message with key "testMessageKey" received by consumer who owns
message's hash range
- Example: "consumer N" with hash range [121-154] received the message
-
- */
- @Test
- void testMessageDeliveredFromDrainingHashes() throws PulsarClientException
{
- String messageKey = "testMessageKey";
- String topic = "testMessageDeliveredFromDrainingHashes" +
UUID.randomUUID();
-
- List<Consumer<Integer>> consumers = new ArrayList<>();
-
- @Cleanup
- Consumer<Integer> consumer = createConsumer(topic);
- String initialOwnerName = consumer.getConsumerName();
-
- @Cleanup
- Producer<Integer> producer = createProducer(topic, false);
- producer.newMessage().key(messageKey).value(1).send();
-
- // receive but not acknowledge the message to leave it in pending acks
- Message<Integer> received = consumer.receive();
- assertThat(received.getKey()).isEqualTo(messageKey);
-
- StickyKeyDispatcher dispatcher = getDispatcher(topic,
SUBSCRIPTION_NAME);
- StickyKeyConsumerSelector selector = dispatcher.getSelector();
- int messageKeyHash =
selector.makeStickyKeyHash(messageKey.getBytes(StandardCharsets.UTF_8));
-
- try {
- // add new consumers until hash range of the initial message owner
moved
- Pair<String, List<Consumer<Integer>>> addedConsumers =
addConsumersUntilOwnerChanged(
- topic, initialOwnerName, messageKeyHash, selector
- );
- String currentOwnerName = addedConsumers.getKey();
- consumers.addAll(addedConsumers.getValue());
-
- // returning hash range to the initial owner
- for (int i = consumers.size() - 1; i > -1 &&
!initialOwnerName.equals(currentOwnerName); i--) {
- consumers.remove(i).close();
- currentOwnerName = findOwnerName(selector, messageKeyHash);
- }
-
- // add new consumers until hash range of the initial message owner
moved
- Pair<String, List<Consumer<Integer>>> addedConsumersAfter =
addConsumersUntilOwnerChanged(
- topic, initialOwnerName, messageKeyHash, selector
- );
-
- String currentOwnerNameAfter = addedConsumersAfter.getKey();
- consumers.addAll(addedConsumersAfter.getValue());
-
- // remove the initial owner
- consumer.close();
-
- // verify the message sent to new consumer which owns hash range
- Optional<Consumer<Integer>> theNewOwnerMaybe = consumers
- .stream()
- .filter(c ->
c.getConsumerName().equals(currentOwnerNameAfter))
- .findAny();
-
- assertThat(theNewOwnerMaybe).isPresent();
-
- Consumer<Integer> theNewOwner = theNewOwnerMaybe.get();
-
- Message<Integer> message = theNewOwner.receive(5,
TimeUnit.SECONDS);
- assertThat(message).describedAs("Message with key " + messageKey
- + " expected to be delivered to consumer " +
theNewOwner.getConsumerName()).isNotNull();
- assertThat(message.getKey()).isEqualTo(messageKey);
- } finally {
- for (Consumer<Integer> c : consumers) {
- c.close();
- }
- }
- }
-
- private Pair<String, List<Consumer<Integer>>>
addConsumersUntilOwnerChanged(
- String topic,
- String initialOwnerName,
- int messageKeyHash,
- StickyKeyConsumerSelector selector
- ) throws PulsarClientException {
- String currentOwnerName;
- List<Consumer<Integer>> consumers = new ArrayList<>();
- do {
- Consumer<Integer> addedC = createConsumer(topic);
- consumers.add(addedC);
- currentOwnerName = findOwnerName(selector, messageKeyHash);
- } while (initialOwnerName.equals(currentOwnerName));
- return Pair.of(currentOwnerName, consumers);
- }
-
- private String findOwnerName(StickyKeyConsumerSelector selector, int hash)
{
- return selector.getConsumerKeyHashRanges().entrySet().stream()
- .filter(entry ->
- entry.getValue().stream().anyMatch(range ->
range.contains(hash))
- )
- .findAny().orElseThrow(() -> new IllegalArgumentException("No
owner for the hash " + hash))
- .getKey().consumerName();
- }
-
@Test(dataProvider = "currentImplementationType")
public void testAttachKeyToMessageMetadata(KeySharedImplementationType
impl) throws PulsarClientException {
String topic = "persistent://public/default/key_shared-" +
UUID.randomUUID();