This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2c25721821af8183d6e5a6c115e4958210eb8453 Author: nborisov <[email protected]> AuthorDate: Fri Sep 12 15:53:48 2025 +0300 [fix][broker] Key_Shared subscription doesn't always deliver messages from the replay queue after a consumer disconnects and leaves a backlog (#24736) Co-authored-by: Nikolai Borisov <[email protected]> (cherry picked from commit 80beab677b699e20ea59d49950442c5543a6540d) --- .../service/ConsumerHashAssignmentsSnapshot.java | 44 ++++--- .../broker/service/DrainingHashesTracker.java | 8 ++ .../broker/service/ImpactedConsumersResult.java | 32 +++-- ...movedHashRanges.java => UpdatedHashRanges.java} | 8 +- ...istentStickyKeyDispatcherMultipleConsumers.java | 17 ++- ...istentHashingStickyKeyConsumerSelectorTest.java | 39 ++++-- .../ConsumerHashAssignmentsSnapshotTest.java | 49 +++++--- .../client/api/KeySharedSubscriptionTest.java | 131 +++++++++++++++++++++ 8 files changed, 266 insertions(+), 62 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshot.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshot.java index 18ea927c642..0607711826b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshot.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshot.java @@ -72,7 +72,7 @@ public class ConsumerHashAssignmentsSnapshot { } public ImpactedConsumersResult resolveImpactedConsumers(ConsumerHashAssignmentsSnapshot assignmentsAfter) { - return resolveConsumerRemovedHashRanges(this.hashRangeAssignments, assignmentsAfter.hashRangeAssignments); + return resolveConsumerUpdatedHashRanges(this.hashRangeAssignments, assignmentsAfter.hashRangeAssignments); } /** @@ -111,28 +111,36 @@ public class ConsumerHashAssignmentsSnapshot { * @param mappingAfter the range mapping after the change * @return consumers and ranges where the existing range changed */ - static ImpactedConsumersResult resolveConsumerRemovedHashRanges(List<HashRangeAssignment> mappingBefore, + static ImpactedConsumersResult resolveConsumerUpdatedHashRanges(List<HashRangeAssignment> mappingBefore, List<HashRangeAssignment> mappingAfter) { Map<Range, Pair<Consumer, Consumer>> impactedRanges = diffRanges(mappingBefore, mappingAfter); - Map<Consumer, SortedSet<Range>> removedRangesByConsumer = impactedRanges.entrySet().stream() - .collect(IdentityHashMap::new, (resultMap, entry) -> { - Range range = entry.getKey(); - // filter out only where the range was removed - Consumer consumerBefore = entry.getValue().getLeft(); - if (consumerBefore != null) { - resultMap.computeIfAbsent(consumerBefore, k -> new TreeSet<>()).add(range); - } - }, IdentityHashMap::putAll); - return mergedOverlappingRangesAndConvertToImpactedConsumersResult(removedRangesByConsumer); + Map<Consumer, SortedSet<Range>> addedRangesByConsumer = new IdentityHashMap<>(); + Map<Consumer, SortedSet<Range>> removedRangesByConsumer = new IdentityHashMap<>(); + impactedRanges.forEach((range, value) -> { + Consumer consumerAfter = value.getRight(); + + // last consumer was removed + if (consumerAfter != null) { + addedRangesByConsumer.computeIfAbsent(consumerAfter, k -> new TreeSet<>()).add(range); + } + // filter out only where the range was removed + Consumer consumerBefore = value.getLeft(); + if (consumerBefore != null) { + removedRangesByConsumer.computeIfAbsent(consumerBefore, k -> new TreeSet<>()).add(range); + } + }); + var removedMerged = mergedOverlappingRangesAndConvertToImpactedConsumersResult(removedRangesByConsumer); + var addedMerged = mergedOverlappingRangesAndConvertToImpactedConsumersResult(addedRangesByConsumer); + return ImpactedConsumersResult.of(removedMerged, addedMerged); } - static ImpactedConsumersResult mergedOverlappingRangesAndConvertToImpactedConsumersResult( - Map<Consumer, SortedSet<Range>> removedRangesByConsumer) { - Map<Consumer, RemovedHashRanges> mergedRangesByConsumer = new IdentityHashMap<>(); - removedRangesByConsumer.forEach((consumer, ranges) -> { - mergedRangesByConsumer.put(consumer, RemovedHashRanges.of(mergeOverlappingRanges(ranges))); + static Map<Consumer, UpdatedHashRanges> mergedOverlappingRangesAndConvertToImpactedConsumersResult( + Map<Consumer, SortedSet<Range>> updatedRangesByConsumer) { + Map<Consumer, UpdatedHashRanges> mergedRangesByConsumer = new IdentityHashMap<>(); + updatedRangesByConsumer.forEach((consumer, ranges) -> { + mergedRangesByConsumer.put(consumer, UpdatedHashRanges.of(mergeOverlappingRanges(ranges))); }); - return ImpactedConsumersResult.of(mergedRangesByConsumer); + return mergedRangesByConsumer; } /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java index 9bc5c5f1e44..51c45817368 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java @@ -400,6 +400,14 @@ public class DrainingHashesTracker { } finally { lock.writeLock().unlock(); } + + // update the consumer specific stats + ConsumerDrainingHashesStats drainingHashesStats = + consumerDrainingHashesStatsMap.get(new ConsumerIdentityWrapper(consumer)); + if (drainingHashesStats != null) { + drainingHashesStats.clearHash(stickyKeyHash); + } + return false; } // increment the blocked count which is used to determine if the hash is blocking diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ImpactedConsumersResult.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ImpactedConsumersResult.java index a525b0501d7..bb888db1ca3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ImpactedConsumersResult.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ImpactedConsumersResult.java @@ -30,22 +30,27 @@ import lombok.ToString; @EqualsAndHashCode @ToString public class ImpactedConsumersResult { - public interface RemovedHashRangesProcessor { - void process(Consumer consumer, RemovedHashRanges removedHashRanges); + public interface UpdatedHashRangesProcessor { + void process(Consumer consumer, UpdatedHashRanges updatedHashRanges, OperationType operationType); } - private final Map<Consumer, RemovedHashRanges> removedHashRanges; + private final Map<Consumer, UpdatedHashRanges> removedHashRanges; + private final Map<Consumer, UpdatedHashRanges> addedHashRanges; - private ImpactedConsumersResult(Map<Consumer, RemovedHashRanges> removedHashRanges) { + private ImpactedConsumersResult(Map<Consumer, UpdatedHashRanges> removedHashRanges, + Map<Consumer, UpdatedHashRanges> addedHashRanges) { this.removedHashRanges = removedHashRanges; + this.addedHashRanges = addedHashRanges; } - public static ImpactedConsumersResult of(Map<Consumer, RemovedHashRanges> removedHashRanges) { - return new ImpactedConsumersResult(removedHashRanges); + public static ImpactedConsumersResult of(Map<Consumer, UpdatedHashRanges> removedHashRanges, + Map<Consumer, UpdatedHashRanges> addedHashRanges) { + return new ImpactedConsumersResult(removedHashRanges, addedHashRanges); } - public void processRemovedHashRanges(RemovedHashRangesProcessor processor) { - removedHashRanges.forEach((c, r) -> processor.process(c, r)); + public void processUpdatedHashRanges(UpdatedHashRangesProcessor processor) { + removedHashRanges.forEach((c, r) -> processor.process(c, r, OperationType.REMOVE)); + addedHashRanges.forEach((c, r) -> processor.process(c, r, OperationType.ADD)); } public boolean isEmpty() { @@ -53,7 +58,16 @@ public class ImpactedConsumersResult { } @VisibleForTesting - Map<Consumer, RemovedHashRanges> getRemovedHashRanges() { + Map<Consumer, UpdatedHashRanges> getRemovedHashRanges() { return removedHashRanges; } + + @VisibleForTesting + Map<Consumer, UpdatedHashRanges> getAddedHashRanges() { + return addedHashRanges; + } + + public enum OperationType { + ADD, REMOVE + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RemovedHashRanges.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/UpdatedHashRanges.java similarity index 94% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RemovedHashRanges.java rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/service/UpdatedHashRanges.java index d9bd502255b..e6f3e6709cf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RemovedHashRanges.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/UpdatedHashRanges.java @@ -29,10 +29,10 @@ import org.apache.pulsar.client.api.Range; */ @EqualsAndHashCode @ToString -public class RemovedHashRanges { +public class UpdatedHashRanges { private final Range[] sortedRanges; - private RemovedHashRanges(List<Range> ranges) { + private UpdatedHashRanges(List<Range> ranges) { // Converts the set of ranges to an array to avoid iterator allocation // when the ranges are iterator multiple times in the pending acknowledgments loop. this.sortedRanges = ranges.toArray(new Range[0]); @@ -52,8 +52,8 @@ public class RemovedHashRanges { } } - public static RemovedHashRanges of(List<Range> ranges) { - return new RemovedHashRanges(ranges); + public static UpdatedHashRanges of(List<Range> ranges) { + return new UpdatedHashRanges(ranges); } /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index e0c74f9043c..c2afc35c619 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -169,16 +169,25 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi private synchronized void registerDrainingHashes(Consumer skipConsumer, ImpactedConsumersResult impactedConsumers) { - impactedConsumers.processRemovedHashRanges((c, removedHashRanges) -> { + impactedConsumers.processUpdatedHashRanges((c, updatedHashRanges, opType) -> { if (c != skipConsumer) { c.getPendingAcks().forEach((ledgerId, entryId, batchSize, stickyKeyHash) -> { if (stickyKeyHash == STICKY_KEY_HASH_NOT_SET) { log.warn("[{}] Sticky key hash was missing for {}:{}", getName(), ledgerId, entryId); return; } - if (removedHashRanges.containsStickyKey(stickyKeyHash)) { - // add the pending ack to the draining hashes tracker if the hash is in the range - drainingHashesTracker.addEntry(c, stickyKeyHash); + if (updatedHashRanges.containsStickyKey(stickyKeyHash)) { + switch (opType) { + //reduce ref count in case the stickyKeyHash was re-assigned to the original consumer + case ADD -> { + var entry = drainingHashesTracker.getEntry(stickyKeyHash); + if (entry != null && entry.getConsumer() == c) { + drainingHashesTracker.reduceRefCount(c, stickyKeyHash, false); + } + } + // add the pending ack to the draining hashes tracker if the hash is in the range + case REMOVE -> drainingHashesTracker.addEntry(c, stickyKeyHash); + } } }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java index 82891bfe9c4..d02b9ca27a2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java @@ -36,7 +36,6 @@ import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.commons.lang3.mutable.MutableInt; -import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException; import org.apache.pulsar.client.api.Range; import org.assertj.core.data.Offset; import org.mockito.Mockito; @@ -47,7 +46,7 @@ import org.testng.annotations.Test; public class ConsistentHashingStickyKeyConsumerSelectorTest { @Test - public void testConsumerSelect() throws ConsumerAssignException { + public void testConsumerSelect() { ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(200); String key1 = "anyKey"; @@ -151,7 +150,7 @@ public class ConsistentHashingStickyKeyConsumerSelectorTest { @Test - public void testGetConsumerKeyHashRanges() throws BrokerServiceException.ConsumerAssignException { + public void testGetConsumerKeyHashRanges() { ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(3); List<String> consumerName = Arrays.asList("consumer1", "consumer2", "consumer3"); List<Consumer> consumers = new ArrayList<>(); @@ -201,8 +200,7 @@ public class ConsistentHashingStickyKeyConsumerSelectorTest { } @Test - public void testConsumersGetSufficientlyAccuratelyEvenlyMapped() - throws BrokerServiceException.ConsumerAssignException { + public void testConsumersGetSufficientlyAccuratelyEvenlyMapped() { ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(200); List<Consumer> consumers = new ArrayList<>(); for (int i = 0; i < 20; i++) { @@ -530,11 +528,24 @@ public class ConsistentHashingStickyKeyConsumerSelectorTest { selector.removeConsumer(consumer); ConsumerHashAssignmentsSnapshot assignmentsAfter = selector.getConsumerHashAssignmentsSnapshot(); - assertThat(assignmentsBefore.resolveImpactedConsumers(assignmentsAfter).getRemovedHashRanges()) + ImpactedConsumersResult impactedConsumersAfterRemoval = assignmentsBefore + .resolveImpactedConsumers(assignmentsAfter); + assertThat(impactedConsumersAfterRemoval.getRemovedHashRanges()) .describedAs( "when a consumer is removed, the removed hash ranges should only be from " + "the removed consumer") .containsOnlyKeys(consumer); + List<Range> allAddedRangesAfterRemoval = ConsumerHashAssignmentsSnapshot.mergeOverlappingRanges( + impactedConsumersAfterRemoval.getAddedHashRanges().values().stream() + .map(UpdatedHashRanges::asRanges).flatMap(List::stream) + .collect(Collectors.toCollection(TreeSet::new)) + ); + assertThat(allAddedRangesAfterRemoval) + .describedAs( + "when a consumer is removed, all its hash ranges should appear " + + "in added hash ranges" + ) + .containsExactlyElementsOf(assignmentsBefore.getRangesByConsumer().get(consumer)); assignmentsBefore = assignmentsAfter; // add consumer back @@ -543,8 +554,9 @@ public class ConsistentHashingStickyKeyConsumerSelectorTest { assignmentsAfter = selector.getConsumerHashAssignmentsSnapshot(); List<Range> addedConsumerRanges = assignmentsAfter.getRangesByConsumer().get(consumer); - Map<Consumer, RemovedHashRanges> removedHashRanges = - assignmentsBefore.resolveImpactedConsumers(assignmentsAfter).getRemovedHashRanges(); + ImpactedConsumersResult impactedConsumersAfterAdding = assignmentsBefore + .resolveImpactedConsumers(assignmentsAfter); + Map<Consumer, UpdatedHashRanges> removedHashRanges = impactedConsumersAfterAdding.getRemovedHashRanges(); ConsumerHashAssignmentsSnapshot finalAssignmentsBefore = assignmentsBefore; assertThat(removedHashRanges).allSatisfy((c, removedHashRange) -> { assertThat(removedHashRange @@ -558,12 +570,19 @@ public class ConsistentHashingStickyKeyConsumerSelectorTest { List<Range> allRemovedRanges = ConsumerHashAssignmentsSnapshot.mergeOverlappingRanges( - removedHashRanges.entrySet().stream().map(Map.Entry::getValue) - .map(RemovedHashRanges::asRanges) + removedHashRanges.values().stream() + .map(UpdatedHashRanges::asRanges) .flatMap(List::stream).collect(Collectors.toCollection(TreeSet::new))); assertThat(allRemovedRanges) .describedAs("all removed ranges should be the same as the ranges of the added consumer") .containsExactlyElementsOf(addedConsumerRanges); + List<Range> allAddedRangesAfterAdding = ConsumerHashAssignmentsSnapshot.mergeOverlappingRanges( + impactedConsumersAfterAdding.getAddedHashRanges().values().stream() + .map(UpdatedHashRanges::asRanges) + .flatMap(List::stream).collect(Collectors.toCollection(TreeSet::new))); + assertThat(addedConsumerRanges) + .describedAs("all added ranges should be the same as the ranges of the added consumer") + .containsExactlyElementsOf(allAddedRangesAfterAdding); assignmentsBefore = assignmentsAfter; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshotTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshotTest.java index 5c886b6eec9..93e53aba7b3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshotTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshotTest.java @@ -129,14 +129,16 @@ public class ConsumerHashAssignmentsSnapshotTest { List<HashRangeAssignment> mappingBefore = new ArrayList<>(); List<HashRangeAssignment> mappingAfter = new ArrayList<>(); + Range notModifiedRange = Range.of(1, 5); Consumer consumer1 = createMockConsumer("consumer1"); - mappingBefore.add(new HashRangeAssignment(Range.of(1, 5), consumer1)); - mappingAfter.add(new HashRangeAssignment(Range.of(1, 5), consumer1)); + mappingBefore.add(new HashRangeAssignment(notModifiedRange, consumer1)); + mappingAfter.add(new HashRangeAssignment(notModifiedRange, consumer1)); ImpactedConsumersResult impactedConsumers = - ConsumerHashAssignmentsSnapshot.resolveConsumerRemovedHashRanges(mappingBefore, mappingAfter); + ConsumerHashAssignmentsSnapshot.resolveConsumerUpdatedHashRanges(mappingBefore, mappingAfter); assertThat(impactedConsumers.getRemovedHashRanges()).isEmpty(); + assertThat(impactedConsumers.getAddedHashRanges()).isEmpty(); } @Test @@ -144,49 +146,59 @@ public class ConsumerHashAssignmentsSnapshotTest { List<HashRangeAssignment> mappingBefore = new ArrayList<>(); List<HashRangeAssignment> mappingAfter = new ArrayList<>(); + Range reAssignedRange = Range.of(1, 5); Consumer consumer1 = createMockConsumer("consumer1"); Consumer consumer2 = createMockConsumer("consumer2"); - mappingBefore.add(new HashRangeAssignment(Range.of(1, 5), consumer1)); - mappingAfter.add(new HashRangeAssignment(Range.of(1, 5), consumer2)); + mappingBefore.add(new HashRangeAssignment(reAssignedRange, consumer1)); + mappingAfter.add(new HashRangeAssignment(reAssignedRange, consumer2)); ImpactedConsumersResult impactedConsumers = - ConsumerHashAssignmentsSnapshot.resolveConsumerRemovedHashRanges(mappingBefore, mappingAfter); + ConsumerHashAssignmentsSnapshot.resolveConsumerUpdatedHashRanges(mappingBefore, mappingAfter); assertThat(impactedConsumers.getRemovedHashRanges()).containsExactlyInAnyOrderEntriesOf( - Map.of(consumer1, RemovedHashRanges.of(List.of(Range.of(1, 5))))); + Map.of(consumer1, UpdatedHashRanges.of(List.of(reAssignedRange)))); + assertThat(impactedConsumers.getAddedHashRanges()).containsExactlyInAnyOrderEntriesOf( + Map.of(consumer2, UpdatedHashRanges.of(List.of(reAssignedRange))) + ); } @Test - public void testResolveConsumerRemovedHashRanges_RangeAdded() { + public void testResolveConsumerUpdatedHashRanges_RangeAdded() { List<HashRangeAssignment> mappingBefore = new ArrayList<>(); List<HashRangeAssignment> mappingAfter = new ArrayList<>(); + Range addedRange = Range.of(1, 5); Consumer consumer1 = createMockConsumer("consumer1"); - mappingAfter.add(new HashRangeAssignment(Range.of(1, 5), consumer1)); + mappingAfter.add(new HashRangeAssignment(addedRange, consumer1)); ImpactedConsumersResult impactedConsumers = - ConsumerHashAssignmentsSnapshot.resolveConsumerRemovedHashRanges(mappingBefore, mappingAfter); + ConsumerHashAssignmentsSnapshot.resolveConsumerUpdatedHashRanges(mappingBefore, mappingAfter); assertThat(impactedConsumers.getRemovedHashRanges()).isEmpty(); + assertThat(impactedConsumers.getAddedHashRanges()).containsExactlyInAnyOrderEntriesOf( + Map.of(consumer1, UpdatedHashRanges.of(List.of(addedRange))) + ); } @Test - public void testResolveConsumerRemovedHashRanges_RangeRemoved() { + public void testResolveConsumerRemovedHashRanges_RangeUpdated() { List<HashRangeAssignment> mappingBefore = new ArrayList<>(); List<HashRangeAssignment> mappingAfter = new ArrayList<>(); + Range removedRange = Range.of(1, 5); Consumer consumer1 = createMockConsumer("consumer1"); - mappingBefore.add(new HashRangeAssignment(Range.of(1, 5), consumer1)); + mappingBefore.add(new HashRangeAssignment(removedRange, consumer1)); ImpactedConsumersResult impactedConsumers = - ConsumerHashAssignmentsSnapshot.resolveConsumerRemovedHashRanges(mappingBefore, mappingAfter); + ConsumerHashAssignmentsSnapshot.resolveConsumerUpdatedHashRanges(mappingBefore, mappingAfter); assertThat(impactedConsumers.getRemovedHashRanges()).containsExactlyInAnyOrderEntriesOf( - Map.of(consumer1, RemovedHashRanges.of(List.of(Range.of(1, 5))))); + Map.of(consumer1, UpdatedHashRanges.of(List.of(removedRange)))); + assertThat(impactedConsumers.getAddedHashRanges()).isEmpty(); } @Test - public void testResolveConsumerRemovedHashRanges_OverlappingRanges() { + public void testResolveConsumerUpdatedHashRanges_OverlappingRanges() { List<HashRangeAssignment> mappingBefore = new ArrayList<>(); List<HashRangeAssignment> mappingAfter = new ArrayList<>(); @@ -196,9 +208,12 @@ public class ConsumerHashAssignmentsSnapshotTest { mappingAfter.add(new HashRangeAssignment(Range.of(3, 7), consumer2)); ImpactedConsumersResult impactedConsumers = - ConsumerHashAssignmentsSnapshot.resolveConsumerRemovedHashRanges(mappingBefore, mappingAfter); + ConsumerHashAssignmentsSnapshot.resolveConsumerUpdatedHashRanges(mappingBefore, mappingAfter); assertThat(impactedConsumers.getRemovedHashRanges()).containsExactlyInAnyOrderEntriesOf( - Map.of(consumer1, RemovedHashRanges.of(List.of(Range.of(3, 5))))); + Map.of(consumer1, UpdatedHashRanges.of(List.of(Range.of(3, 5))))); + assertThat(impactedConsumers.getAddedHashRanges()).containsExactlyInAnyOrderEntriesOf( + Map.of(consumer2, UpdatedHashRanges.of(List.of(Range.of(3, 7)))) + ); } } \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java index af7c32192c7..80b5070077d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java @@ -30,6 +30,7 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.common.collect.Lists; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; import java.time.Duration; @@ -919,6 +920,136 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase { consumer.close(); } + /* + Verifies https://github.com/apache/pulsar/issues/23845 issue fixed + + The scenario of verification + + Preconditions: + + 1. Consumer "consumer" up and running + 2. Producer "producer" connected to broker + + Steps to reproduce: + 1. "producer" send message with key "testMessageKey" + Example: "testMessage" hash is always 124 + + 2. "consumer" receives BUT DO NOT acknowledge message with key "testMessageKey" + Example: "consumer" hash range: [0-256]. "testMessage" hash range: [0-256] + + 3. Add more consumers until hash range of message with key "testMessageKey" removed from "consumer" hash ranges + Example: "consumer" hash range: [0-123]. "consumer N" hash range: [124-136]. "testMessage" hash range: [124-136] + + 4. Stop the added consumers until message key "testMessageKey" hash range moved to "consumer" hash ranges + Example: "consumer" hash range: [0-128]. "testMessage" hash range: [0-128] + + 5. Add more consumers until hash range of message with key "testMessageKey" removed from "consumer" hash ranges + Example: "consumer" hash range: [0-96]. "consumer N" hash range: [121-154]. "testMessage" hash range: [121-154] + + 6. Stop "consumer" + + Actual result for bug: + No message with key "testMessageKey" received by consumer who owns message's hash range + + Expected result: + The message with key "testMessageKey" received by consumer who owns message's hash range + Example: "consumer N" with hash range [121-154] received the message + + */ + @Test + void testMessageDeliveredFromDrainingHashes() throws PulsarClientException { + String messageKey = "testMessageKey"; + String topic = "testMessageDeliveredFromDrainingHashes" + UUID.randomUUID(); + + List<Consumer<Integer>> consumers = new ArrayList<>(); + + @Cleanup + Consumer<Integer> consumer = createConsumer(topic); + String initialOwnerName = consumer.getConsumerName(); + + @Cleanup + Producer<Integer> producer = createProducer(topic, false); + producer.newMessage().key(messageKey).value(1).send(); + + // receive but not acknowledge the message to leave it in pending acks + Message<Integer> received = consumer.receive(); + assertThat(received.getKey()).isEqualTo(messageKey); + + StickyKeyDispatcher dispatcher = getDispatcher(topic, SUBSCRIPTION_NAME); + StickyKeyConsumerSelector selector = dispatcher.getSelector(); + int messageKeyHash = selector.makeStickyKeyHash(messageKey.getBytes(StandardCharsets.UTF_8)); + + try { + // add new consumers until hash range of the initial message owner moved + Pair<String, List<Consumer<Integer>>> addedConsumers = addConsumersUntilOwnerChanged( + topic, initialOwnerName, messageKeyHash, selector + ); + String currentOwnerName = addedConsumers.getKey(); + consumers.addAll(addedConsumers.getValue()); + + // returning hash range to the initial owner + for (int i = consumers.size() - 1; i > -1 && !initialOwnerName.equals(currentOwnerName); i--) { + consumers.remove(i).close(); + currentOwnerName = findOwnerName(selector, messageKeyHash); + } + + // add new consumers until hash range of the initial message owner moved + Pair<String, List<Consumer<Integer>>> addedConsumersAfter = addConsumersUntilOwnerChanged( + topic, initialOwnerName, messageKeyHash, selector + ); + + String currentOwnerNameAfter = addedConsumersAfter.getKey(); + consumers.addAll(addedConsumersAfter.getValue()); + + // remove the initial owner + consumer.close(); + + // verify the message sent to new consumer which owns hash range + Optional<Consumer<Integer>> theNewOwnerMaybe = consumers + .stream() + .filter(c -> c.getConsumerName().equals(currentOwnerNameAfter)) + .findAny(); + + assertThat(theNewOwnerMaybe).isPresent(); + + Consumer<Integer> theNewOwner = theNewOwnerMaybe.get(); + + Message<Integer> message = theNewOwner.receive(5, TimeUnit.SECONDS); + assertThat(message).describedAs("Message with key " + messageKey + + " expected to be delivered to consumer " + theNewOwner.getConsumerName()).isNotNull(); + assertThat(message.getKey()).isEqualTo(messageKey); + } finally { + for (Consumer<Integer> c : consumers) { + c.close(); + } + } + } + + private Pair<String, List<Consumer<Integer>>> addConsumersUntilOwnerChanged( + String topic, + String initialOwnerName, + int messageKeyHash, + StickyKeyConsumerSelector selector + ) throws PulsarClientException { + String currentOwnerName; + List<Consumer<Integer>> consumers = new ArrayList<>(); + do { + Consumer<Integer> addedC = createConsumer(topic); + consumers.add(addedC); + currentOwnerName = findOwnerName(selector, messageKeyHash); + } while (initialOwnerName.equals(currentOwnerName)); + return Pair.of(currentOwnerName, consumers); + } + + private String findOwnerName(StickyKeyConsumerSelector selector, int hash) { + return selector.getConsumerKeyHashRanges().entrySet().stream() + .filter(entry -> + entry.getValue().stream().anyMatch(range -> range.contains(hash)) + ) + .findAny().orElseThrow(() -> new IllegalArgumentException("No owner for the hash " + hash)) + .getKey().consumerName(); + } + @Test(dataProvider = "currentImplementationType") public void testAttachKeyToMessageMetadata(KeySharedImplementationType impl) throws PulsarClientException { String topic = "persistent://public/default/key_shared-" + UUID.randomUUID();
