equanz commented on code in PR #23441:
URL: https://github.com/apache/pulsar/pull/23441#discussion_r1797140960
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java:
##########
@@ -76,18 +78,36 @@ public CompletableFuture<Optional<ImpactedConsumersResult>>
addConsumer(Consumer
ConsumerIdentityWrapper consumerIdentityWrapper = new
ConsumerIdentityWrapper(consumer);
// Insert multiple points on the hash ring for every consumer
// The points are deterministically added based on the hash of the
consumer name
+ int hashPointsAdded = 0;
+ int hashPointCollisions = 0;
for (int i = 0; i < numberOfPoints; i++) {
int consumerNameIndex =
consumerNameIndexTracker.increaseConsumerRefCountAndReturnIndex(consumerIdentityWrapper);
int hash = calculateHashForConsumerAndIndex(consumer,
consumerNameIndex, i);
- // When there's a collision, the new consumer will replace the
old one.
- // This is a rare case, and it is acceptable to replace the
old consumer since there
- // are multiple points for each consumer. This won't affect
the overall distribution significantly.
- ConsumerIdentityWrapper removed = hashRing.put(hash,
consumerIdentityWrapper);
- if (removed != null) {
- consumerNameIndexTracker.decreaseConsumerRefCount(removed);
+ // When there's a collision, the entry won't be added to the
hash ring.
+ // This isn't a problem with the consumerNameIndexTracker
solution since the collisions won't align
+ // for all hash ring points when using the same consumer name.
This won't affect the overall
+ // distribution significantly when the number of hash ring
points is sufficiently large (>100).
+ ConsumerIdentityWrapper existing = hashRing.putIfAbsent(hash,
consumerIdentityWrapper);
+ if (existing != null) {
+ hashPointCollisions++;
+ // reduce the ref count which was increased before adding
since the consumer was not added
+
consumerNameIndexTracker.decreaseConsumerRefCount(consumerIdentityWrapper);
+ } else {
+ hashPointsAdded++;
}
}
+ if (hashPointsAdded == 0) {
+ log.error("Failed to add consumer '{}' to the hash ring. There
were {} collisions. Consider increasing "
+ + "the number of points ({}) per consumer by
setting "
+ +
"subscriptionKeySharedConsistentHashingReplicaPoints={}",
+ consumer, hashPointCollisions, numberOfPoints,
+ Math.max((int) (numberOfPoints * 1.5d), numberOfPoints
+ 1));
+ }
Review Comment:
It can be done outside of scope, but I think it would be better if this
state could be sent to consumers.
(Consumers can't know there are no hash points, can they?)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]