This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch revert-24732-fix-key-shared-bug-23845 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9183d3c9e3f81f4729060233c1c2f719f792cd9e Author: Lari Hotari <[email protected]> AuthorDate: Fri Sep 12 12:33:32 2025 +0300 Revert "[fix][broker] Key_Shared subscription doesn't always deliver messages…" This reverts commit ca8d33835dba2b6200c68838af13a8cced41282b. --- .../service/ConsumerHashAssignmentsSnapshot.java | 44 +++---- .../broker/service/DrainingHashesTracker.java | 8 -- .../broker/service/ImpactedConsumersResult.java | 32 ++--- ...datedHashRanges.java => RemovedHashRanges.java} | 8 +- ...istentStickyKeyDispatcherMultipleConsumers.java | 17 +-- ...istentHashingStickyKeyConsumerSelectorTest.java | 39 ++---- .../ConsumerHashAssignmentsSnapshotTest.java | 49 +++----- .../client/api/KeySharedSubscriptionTest.java | 131 --------------------- 8 files changed, 62 insertions(+), 266 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshot.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshot.java index 0607711826b..18ea927c642 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshot.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshot.java @@ -72,7 +72,7 @@ public class ConsumerHashAssignmentsSnapshot { } public ImpactedConsumersResult resolveImpactedConsumers(ConsumerHashAssignmentsSnapshot assignmentsAfter) { - return resolveConsumerUpdatedHashRanges(this.hashRangeAssignments, assignmentsAfter.hashRangeAssignments); + return resolveConsumerRemovedHashRanges(this.hashRangeAssignments, assignmentsAfter.hashRangeAssignments); } /** @@ -111,36 +111,28 @@ public class ConsumerHashAssignmentsSnapshot { * @param mappingAfter the range mapping after the change * @return consumers and ranges where the existing range changed */ - static ImpactedConsumersResult resolveConsumerUpdatedHashRanges(List<HashRangeAssignment> mappingBefore, + static ImpactedConsumersResult resolveConsumerRemovedHashRanges(List<HashRangeAssignment> mappingBefore, List<HashRangeAssignment> mappingAfter) { Map<Range, Pair<Consumer, Consumer>> impactedRanges = diffRanges(mappingBefore, mappingAfter); - Map<Consumer, SortedSet<Range>> addedRangesByConsumer = new IdentityHashMap<>(); - Map<Consumer, SortedSet<Range>> removedRangesByConsumer = new IdentityHashMap<>(); - impactedRanges.forEach((range, value) -> { - Consumer consumerAfter = value.getRight(); - - // last consumer was removed - if (consumerAfter != null) { - addedRangesByConsumer.computeIfAbsent(consumerAfter, k -> new TreeSet<>()).add(range); - } - // filter out only where the range was removed - Consumer consumerBefore = value.getLeft(); - if (consumerBefore != null) { - removedRangesByConsumer.computeIfAbsent(consumerBefore, k -> new TreeSet<>()).add(range); - } - }); - var removedMerged = mergedOverlappingRangesAndConvertToImpactedConsumersResult(removedRangesByConsumer); - var addedMerged = mergedOverlappingRangesAndConvertToImpactedConsumersResult(addedRangesByConsumer); - return ImpactedConsumersResult.of(removedMerged, addedMerged); + Map<Consumer, SortedSet<Range>> removedRangesByConsumer = impactedRanges.entrySet().stream() + .collect(IdentityHashMap::new, (resultMap, entry) -> { + Range range = entry.getKey(); + // filter out only where the range was removed + Consumer consumerBefore = entry.getValue().getLeft(); + if (consumerBefore != null) { + resultMap.computeIfAbsent(consumerBefore, k -> new TreeSet<>()).add(range); + } + }, IdentityHashMap::putAll); + return mergedOverlappingRangesAndConvertToImpactedConsumersResult(removedRangesByConsumer); } - static Map<Consumer, UpdatedHashRanges> mergedOverlappingRangesAndConvertToImpactedConsumersResult( - Map<Consumer, SortedSet<Range>> updatedRangesByConsumer) { - Map<Consumer, UpdatedHashRanges> mergedRangesByConsumer = new IdentityHashMap<>(); - updatedRangesByConsumer.forEach((consumer, ranges) -> { - mergedRangesByConsumer.put(consumer, UpdatedHashRanges.of(mergeOverlappingRanges(ranges))); + static ImpactedConsumersResult mergedOverlappingRangesAndConvertToImpactedConsumersResult( + Map<Consumer, SortedSet<Range>> removedRangesByConsumer) { + Map<Consumer, RemovedHashRanges> mergedRangesByConsumer = new IdentityHashMap<>(); + removedRangesByConsumer.forEach((consumer, ranges) -> { + mergedRangesByConsumer.put(consumer, RemovedHashRanges.of(mergeOverlappingRanges(ranges))); }); - return mergedRangesByConsumer; + return ImpactedConsumersResult.of(mergedRangesByConsumer); } /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java index 51c45817368..9bc5c5f1e44 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java @@ -400,14 +400,6 @@ public class DrainingHashesTracker { } finally { lock.writeLock().unlock(); } - - // update the consumer specific stats - ConsumerDrainingHashesStats drainingHashesStats = - consumerDrainingHashesStatsMap.get(new ConsumerIdentityWrapper(consumer)); - if (drainingHashesStats != null) { - drainingHashesStats.clearHash(stickyKeyHash); - } - return false; } // increment the blocked count which is used to determine if the hash is blocking diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ImpactedConsumersResult.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ImpactedConsumersResult.java index bb888db1ca3..a525b0501d7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ImpactedConsumersResult.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ImpactedConsumersResult.java @@ -30,27 +30,22 @@ import lombok.ToString; @EqualsAndHashCode @ToString public class ImpactedConsumersResult { - public interface UpdatedHashRangesProcessor { - void process(Consumer consumer, UpdatedHashRanges updatedHashRanges, OperationType operationType); + public interface RemovedHashRangesProcessor { + void process(Consumer consumer, RemovedHashRanges removedHashRanges); } - private final Map<Consumer, UpdatedHashRanges> removedHashRanges; - private final Map<Consumer, UpdatedHashRanges> addedHashRanges; + private final Map<Consumer, RemovedHashRanges> removedHashRanges; - private ImpactedConsumersResult(Map<Consumer, UpdatedHashRanges> removedHashRanges, - Map<Consumer, UpdatedHashRanges> addedHashRanges) { + private ImpactedConsumersResult(Map<Consumer, RemovedHashRanges> removedHashRanges) { this.removedHashRanges = removedHashRanges; - this.addedHashRanges = addedHashRanges; } - public static ImpactedConsumersResult of(Map<Consumer, UpdatedHashRanges> removedHashRanges, - Map<Consumer, UpdatedHashRanges> addedHashRanges) { - return new ImpactedConsumersResult(removedHashRanges, addedHashRanges); + public static ImpactedConsumersResult of(Map<Consumer, RemovedHashRanges> removedHashRanges) { + return new ImpactedConsumersResult(removedHashRanges); } - public void processUpdatedHashRanges(UpdatedHashRangesProcessor processor) { - removedHashRanges.forEach((c, r) -> processor.process(c, r, OperationType.REMOVE)); - addedHashRanges.forEach((c, r) -> processor.process(c, r, OperationType.ADD)); + public void processRemovedHashRanges(RemovedHashRangesProcessor processor) { + removedHashRanges.forEach((c, r) -> processor.process(c, r)); } public boolean isEmpty() { @@ -58,16 +53,7 @@ public class ImpactedConsumersResult { } @VisibleForTesting - Map<Consumer, UpdatedHashRanges> getRemovedHashRanges() { + Map<Consumer, RemovedHashRanges> getRemovedHashRanges() { return removedHashRanges; } - - @VisibleForTesting - Map<Consumer, UpdatedHashRanges> getAddedHashRanges() { - return addedHashRanges; - } - - public enum OperationType { - ADD, REMOVE - } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/UpdatedHashRanges.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RemovedHashRanges.java similarity index 94% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/service/UpdatedHashRanges.java rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RemovedHashRanges.java index e6f3e6709cf..d9bd502255b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/UpdatedHashRanges.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RemovedHashRanges.java @@ -29,10 +29,10 @@ import org.apache.pulsar.client.api.Range; */ @EqualsAndHashCode @ToString -public class UpdatedHashRanges { +public class RemovedHashRanges { private final Range[] sortedRanges; - private UpdatedHashRanges(List<Range> ranges) { + private RemovedHashRanges(List<Range> ranges) { // Converts the set of ranges to an array to avoid iterator allocation // when the ranges are iterator multiple times in the pending acknowledgments loop. this.sortedRanges = ranges.toArray(new Range[0]); @@ -52,8 +52,8 @@ public class UpdatedHashRanges { } } - public static UpdatedHashRanges of(List<Range> ranges) { - return new UpdatedHashRanges(ranges); + public static RemovedHashRanges of(List<Range> ranges) { + return new RemovedHashRanges(ranges); } /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index d803bc7d963..9e92a2ab40d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -170,25 +170,16 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi private synchronized void registerDrainingHashes(Consumer skipConsumer, ImpactedConsumersResult impactedConsumers) { - impactedConsumers.processUpdatedHashRanges((c, updatedHashRanges, opType) -> { + impactedConsumers.processRemovedHashRanges((c, removedHashRanges) -> { if (c != skipConsumer) { c.getPendingAcks().forEach((ledgerId, entryId, batchSize, stickyKeyHash) -> { if (stickyKeyHash == STICKY_KEY_HASH_NOT_SET) { log.warn("[{}] Sticky key hash was missing for {}:{}", getName(), ledgerId, entryId); return; } - if (updatedHashRanges.containsStickyKey(stickyKeyHash)) { - switch (opType) { - //reduce ref count in case the stickyKeyHash was re-assigned to the original consumer - case ADD -> { - var entry = drainingHashesTracker.getEntry(stickyKeyHash); - if (entry != null && entry.getConsumer() == c) { - drainingHashesTracker.reduceRefCount(c, stickyKeyHash, false); - } - } - // add the pending ack to the draining hashes tracker if the hash is in the range - case REMOVE -> drainingHashesTracker.addEntry(c, stickyKeyHash); - } + if (removedHashRanges.containsStickyKey(stickyKeyHash)) { + // add the pending ack to the draining hashes tracker if the hash is in the range + drainingHashesTracker.addEntry(c, stickyKeyHash); } }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java index d02b9ca27a2..82891bfe9c4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java @@ -36,6 +36,7 @@ import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException; import org.apache.pulsar.client.api.Range; import org.assertj.core.data.Offset; import org.mockito.Mockito; @@ -46,7 +47,7 @@ import org.testng.annotations.Test; public class ConsistentHashingStickyKeyConsumerSelectorTest { @Test - public void testConsumerSelect() { + public void testConsumerSelect() throws ConsumerAssignException { ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(200); String key1 = "anyKey"; @@ -150,7 +151,7 @@ public class ConsistentHashingStickyKeyConsumerSelectorTest { @Test - public void testGetConsumerKeyHashRanges() { + public void testGetConsumerKeyHashRanges() throws BrokerServiceException.ConsumerAssignException { ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(3); List<String> consumerName = Arrays.asList("consumer1", "consumer2", "consumer3"); List<Consumer> consumers = new ArrayList<>(); @@ -200,7 +201,8 @@ public class ConsistentHashingStickyKeyConsumerSelectorTest { } @Test - public void testConsumersGetSufficientlyAccuratelyEvenlyMapped() { + public void testConsumersGetSufficientlyAccuratelyEvenlyMapped() + throws BrokerServiceException.ConsumerAssignException { ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(200); List<Consumer> consumers = new ArrayList<>(); for (int i = 0; i < 20; i++) { @@ -528,24 +530,11 @@ public class ConsistentHashingStickyKeyConsumerSelectorTest { selector.removeConsumer(consumer); ConsumerHashAssignmentsSnapshot assignmentsAfter = selector.getConsumerHashAssignmentsSnapshot(); - ImpactedConsumersResult impactedConsumersAfterRemoval = assignmentsBefore - .resolveImpactedConsumers(assignmentsAfter); - assertThat(impactedConsumersAfterRemoval.getRemovedHashRanges()) + assertThat(assignmentsBefore.resolveImpactedConsumers(assignmentsAfter).getRemovedHashRanges()) .describedAs( "when a consumer is removed, the removed hash ranges should only be from " + "the removed consumer") .containsOnlyKeys(consumer); - List<Range> allAddedRangesAfterRemoval = ConsumerHashAssignmentsSnapshot.mergeOverlappingRanges( - impactedConsumersAfterRemoval.getAddedHashRanges().values().stream() - .map(UpdatedHashRanges::asRanges).flatMap(List::stream) - .collect(Collectors.toCollection(TreeSet::new)) - ); - assertThat(allAddedRangesAfterRemoval) - .describedAs( - "when a consumer is removed, all its hash ranges should appear " - + "in added hash ranges" - ) - .containsExactlyElementsOf(assignmentsBefore.getRangesByConsumer().get(consumer)); assignmentsBefore = assignmentsAfter; // add consumer back @@ -554,9 +543,8 @@ public class ConsistentHashingStickyKeyConsumerSelectorTest { assignmentsAfter = selector.getConsumerHashAssignmentsSnapshot(); List<Range> addedConsumerRanges = assignmentsAfter.getRangesByConsumer().get(consumer); - ImpactedConsumersResult impactedConsumersAfterAdding = assignmentsBefore - .resolveImpactedConsumers(assignmentsAfter); - Map<Consumer, UpdatedHashRanges> removedHashRanges = impactedConsumersAfterAdding.getRemovedHashRanges(); + Map<Consumer, RemovedHashRanges> removedHashRanges = + assignmentsBefore.resolveImpactedConsumers(assignmentsAfter).getRemovedHashRanges(); ConsumerHashAssignmentsSnapshot finalAssignmentsBefore = assignmentsBefore; assertThat(removedHashRanges).allSatisfy((c, removedHashRange) -> { assertThat(removedHashRange @@ -570,19 +558,12 @@ public class ConsistentHashingStickyKeyConsumerSelectorTest { List<Range> allRemovedRanges = ConsumerHashAssignmentsSnapshot.mergeOverlappingRanges( - removedHashRanges.values().stream() - .map(UpdatedHashRanges::asRanges) + removedHashRanges.entrySet().stream().map(Map.Entry::getValue) + .map(RemovedHashRanges::asRanges) .flatMap(List::stream).collect(Collectors.toCollection(TreeSet::new))); assertThat(allRemovedRanges) .describedAs("all removed ranges should be the same as the ranges of the added consumer") .containsExactlyElementsOf(addedConsumerRanges); - List<Range> allAddedRangesAfterAdding = ConsumerHashAssignmentsSnapshot.mergeOverlappingRanges( - impactedConsumersAfterAdding.getAddedHashRanges().values().stream() - .map(UpdatedHashRanges::asRanges) - .flatMap(List::stream).collect(Collectors.toCollection(TreeSet::new))); - assertThat(addedConsumerRanges) - .describedAs("all added ranges should be the same as the ranges of the added consumer") - .containsExactlyElementsOf(allAddedRangesAfterAdding); assignmentsBefore = assignmentsAfter; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshotTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshotTest.java index 93e53aba7b3..5c886b6eec9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshotTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshotTest.java @@ -129,16 +129,14 @@ public class ConsumerHashAssignmentsSnapshotTest { List<HashRangeAssignment> mappingBefore = new ArrayList<>(); List<HashRangeAssignment> mappingAfter = new ArrayList<>(); - Range notModifiedRange = Range.of(1, 5); Consumer consumer1 = createMockConsumer("consumer1"); - mappingBefore.add(new HashRangeAssignment(notModifiedRange, consumer1)); - mappingAfter.add(new HashRangeAssignment(notModifiedRange, consumer1)); + mappingBefore.add(new HashRangeAssignment(Range.of(1, 5), consumer1)); + mappingAfter.add(new HashRangeAssignment(Range.of(1, 5), consumer1)); ImpactedConsumersResult impactedConsumers = - ConsumerHashAssignmentsSnapshot.resolveConsumerUpdatedHashRanges(mappingBefore, mappingAfter); + ConsumerHashAssignmentsSnapshot.resolveConsumerRemovedHashRanges(mappingBefore, mappingAfter); assertThat(impactedConsumers.getRemovedHashRanges()).isEmpty(); - assertThat(impactedConsumers.getAddedHashRanges()).isEmpty(); } @Test @@ -146,59 +144,49 @@ public class ConsumerHashAssignmentsSnapshotTest { List<HashRangeAssignment> mappingBefore = new ArrayList<>(); List<HashRangeAssignment> mappingAfter = new ArrayList<>(); - Range reAssignedRange = Range.of(1, 5); Consumer consumer1 = createMockConsumer("consumer1"); Consumer consumer2 = createMockConsumer("consumer2"); - mappingBefore.add(new HashRangeAssignment(reAssignedRange, consumer1)); - mappingAfter.add(new HashRangeAssignment(reAssignedRange, consumer2)); + mappingBefore.add(new HashRangeAssignment(Range.of(1, 5), consumer1)); + mappingAfter.add(new HashRangeAssignment(Range.of(1, 5), consumer2)); ImpactedConsumersResult impactedConsumers = - ConsumerHashAssignmentsSnapshot.resolveConsumerUpdatedHashRanges(mappingBefore, mappingAfter); + ConsumerHashAssignmentsSnapshot.resolveConsumerRemovedHashRanges(mappingBefore, mappingAfter); assertThat(impactedConsumers.getRemovedHashRanges()).containsExactlyInAnyOrderEntriesOf( - Map.of(consumer1, UpdatedHashRanges.of(List.of(reAssignedRange)))); - assertThat(impactedConsumers.getAddedHashRanges()).containsExactlyInAnyOrderEntriesOf( - Map.of(consumer2, UpdatedHashRanges.of(List.of(reAssignedRange))) - ); + Map.of(consumer1, RemovedHashRanges.of(List.of(Range.of(1, 5))))); } @Test - public void testResolveConsumerUpdatedHashRanges_RangeAdded() { + public void testResolveConsumerRemovedHashRanges_RangeAdded() { List<HashRangeAssignment> mappingBefore = new ArrayList<>(); List<HashRangeAssignment> mappingAfter = new ArrayList<>(); - Range addedRange = Range.of(1, 5); Consumer consumer1 = createMockConsumer("consumer1"); - mappingAfter.add(new HashRangeAssignment(addedRange, consumer1)); + mappingAfter.add(new HashRangeAssignment(Range.of(1, 5), consumer1)); ImpactedConsumersResult impactedConsumers = - ConsumerHashAssignmentsSnapshot.resolveConsumerUpdatedHashRanges(mappingBefore, mappingAfter); + ConsumerHashAssignmentsSnapshot.resolveConsumerRemovedHashRanges(mappingBefore, mappingAfter); assertThat(impactedConsumers.getRemovedHashRanges()).isEmpty(); - assertThat(impactedConsumers.getAddedHashRanges()).containsExactlyInAnyOrderEntriesOf( - Map.of(consumer1, UpdatedHashRanges.of(List.of(addedRange))) - ); } @Test - public void testResolveConsumerRemovedHashRanges_RangeUpdated() { + public void testResolveConsumerRemovedHashRanges_RangeRemoved() { List<HashRangeAssignment> mappingBefore = new ArrayList<>(); List<HashRangeAssignment> mappingAfter = new ArrayList<>(); - Range removedRange = Range.of(1, 5); Consumer consumer1 = createMockConsumer("consumer1"); - mappingBefore.add(new HashRangeAssignment(removedRange, consumer1)); + mappingBefore.add(new HashRangeAssignment(Range.of(1, 5), consumer1)); ImpactedConsumersResult impactedConsumers = - ConsumerHashAssignmentsSnapshot.resolveConsumerUpdatedHashRanges(mappingBefore, mappingAfter); + ConsumerHashAssignmentsSnapshot.resolveConsumerRemovedHashRanges(mappingBefore, mappingAfter); assertThat(impactedConsumers.getRemovedHashRanges()).containsExactlyInAnyOrderEntriesOf( - Map.of(consumer1, UpdatedHashRanges.of(List.of(removedRange)))); - assertThat(impactedConsumers.getAddedHashRanges()).isEmpty(); + Map.of(consumer1, RemovedHashRanges.of(List.of(Range.of(1, 5))))); } @Test - public void testResolveConsumerUpdatedHashRanges_OverlappingRanges() { + public void testResolveConsumerRemovedHashRanges_OverlappingRanges() { List<HashRangeAssignment> mappingBefore = new ArrayList<>(); List<HashRangeAssignment> mappingAfter = new ArrayList<>(); @@ -208,12 +196,9 @@ public class ConsumerHashAssignmentsSnapshotTest { mappingAfter.add(new HashRangeAssignment(Range.of(3, 7), consumer2)); ImpactedConsumersResult impactedConsumers = - ConsumerHashAssignmentsSnapshot.resolveConsumerUpdatedHashRanges(mappingBefore, mappingAfter); + ConsumerHashAssignmentsSnapshot.resolveConsumerRemovedHashRanges(mappingBefore, mappingAfter); assertThat(impactedConsumers.getRemovedHashRanges()).containsExactlyInAnyOrderEntriesOf( - Map.of(consumer1, UpdatedHashRanges.of(List.of(Range.of(3, 5))))); - assertThat(impactedConsumers.getAddedHashRanges()).containsExactlyInAnyOrderEntriesOf( - Map.of(consumer2, UpdatedHashRanges.of(List.of(Range.of(3, 7)))) - ); + Map.of(consumer1, RemovedHashRanges.of(List.of(Range.of(3, 5))))); } } \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java index 80b5070077d..af7c32192c7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java @@ -30,7 +30,6 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.common.collect.Lists; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; import java.time.Duration; @@ -920,136 +919,6 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase { consumer.close(); } - /* - Verifies https://github.com/apache/pulsar/issues/23845 issue fixed - - The scenario of verification - - Preconditions: - - 1. Consumer "consumer" up and running - 2. Producer "producer" connected to broker - - Steps to reproduce: - 1. "producer" send message with key "testMessageKey" - Example: "testMessage" hash is always 124 - - 2. "consumer" receives BUT DO NOT acknowledge message with key "testMessageKey" - Example: "consumer" hash range: [0-256]. "testMessage" hash range: [0-256] - - 3. Add more consumers until hash range of message with key "testMessageKey" removed from "consumer" hash ranges - Example: "consumer" hash range: [0-123]. "consumer N" hash range: [124-136]. "testMessage" hash range: [124-136] - - 4. Stop the added consumers until message key "testMessageKey" hash range moved to "consumer" hash ranges - Example: "consumer" hash range: [0-128]. "testMessage" hash range: [0-128] - - 5. Add more consumers until hash range of message with key "testMessageKey" removed from "consumer" hash ranges - Example: "consumer" hash range: [0-96]. "consumer N" hash range: [121-154]. "testMessage" hash range: [121-154] - - 6. Stop "consumer" - - Actual result for bug: - No message with key "testMessageKey" received by consumer who owns message's hash range - - Expected result: - The message with key "testMessageKey" received by consumer who owns message's hash range - Example: "consumer N" with hash range [121-154] received the message - - */ - @Test - void testMessageDeliveredFromDrainingHashes() throws PulsarClientException { - String messageKey = "testMessageKey"; - String topic = "testMessageDeliveredFromDrainingHashes" + UUID.randomUUID(); - - List<Consumer<Integer>> consumers = new ArrayList<>(); - - @Cleanup - Consumer<Integer> consumer = createConsumer(topic); - String initialOwnerName = consumer.getConsumerName(); - - @Cleanup - Producer<Integer> producer = createProducer(topic, false); - producer.newMessage().key(messageKey).value(1).send(); - - // receive but not acknowledge the message to leave it in pending acks - Message<Integer> received = consumer.receive(); - assertThat(received.getKey()).isEqualTo(messageKey); - - StickyKeyDispatcher dispatcher = getDispatcher(topic, SUBSCRIPTION_NAME); - StickyKeyConsumerSelector selector = dispatcher.getSelector(); - int messageKeyHash = selector.makeStickyKeyHash(messageKey.getBytes(StandardCharsets.UTF_8)); - - try { - // add new consumers until hash range of the initial message owner moved - Pair<String, List<Consumer<Integer>>> addedConsumers = addConsumersUntilOwnerChanged( - topic, initialOwnerName, messageKeyHash, selector - ); - String currentOwnerName = addedConsumers.getKey(); - consumers.addAll(addedConsumers.getValue()); - - // returning hash range to the initial owner - for (int i = consumers.size() - 1; i > -1 && !initialOwnerName.equals(currentOwnerName); i--) { - consumers.remove(i).close(); - currentOwnerName = findOwnerName(selector, messageKeyHash); - } - - // add new consumers until hash range of the initial message owner moved - Pair<String, List<Consumer<Integer>>> addedConsumersAfter = addConsumersUntilOwnerChanged( - topic, initialOwnerName, messageKeyHash, selector - ); - - String currentOwnerNameAfter = addedConsumersAfter.getKey(); - consumers.addAll(addedConsumersAfter.getValue()); - - // remove the initial owner - consumer.close(); - - // verify the message sent to new consumer which owns hash range - Optional<Consumer<Integer>> theNewOwnerMaybe = consumers - .stream() - .filter(c -> c.getConsumerName().equals(currentOwnerNameAfter)) - .findAny(); - - assertThat(theNewOwnerMaybe).isPresent(); - - Consumer<Integer> theNewOwner = theNewOwnerMaybe.get(); - - Message<Integer> message = theNewOwner.receive(5, TimeUnit.SECONDS); - assertThat(message).describedAs("Message with key " + messageKey - + " expected to be delivered to consumer " + theNewOwner.getConsumerName()).isNotNull(); - assertThat(message.getKey()).isEqualTo(messageKey); - } finally { - for (Consumer<Integer> c : consumers) { - c.close(); - } - } - } - - private Pair<String, List<Consumer<Integer>>> addConsumersUntilOwnerChanged( - String topic, - String initialOwnerName, - int messageKeyHash, - StickyKeyConsumerSelector selector - ) throws PulsarClientException { - String currentOwnerName; - List<Consumer<Integer>> consumers = new ArrayList<>(); - do { - Consumer<Integer> addedC = createConsumer(topic); - consumers.add(addedC); - currentOwnerName = findOwnerName(selector, messageKeyHash); - } while (initialOwnerName.equals(currentOwnerName)); - return Pair.of(currentOwnerName, consumers); - } - - private String findOwnerName(StickyKeyConsumerSelector selector, int hash) { - return selector.getConsumerKeyHashRanges().entrySet().stream() - .filter(entry -> - entry.getValue().stream().anyMatch(range -> range.contains(hash)) - ) - .findAny().orElseThrow(() -> new IllegalArgumentException("No owner for the hash " + hash)) - .getKey().consumerName(); - } - @Test(dataProvider = "currentImplementationType") public void testAttachKeyToMessageMetadata(KeySharedImplementationType impl) throws PulsarClientException { String topic = "persistent://public/default/key_shared-" + UUID.randomUUID();
