BewareMyPower commented on code in PR #23915:
URL: https://github.com/apache/pulsar/pull/23915#discussion_r1948370243
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -439,12 +439,18 @@ private Map<Consumer, List<Entry>>
filterAndGroupEntriesForDispatching(List<Entr
permitsForConsumer.computeIfAbsent(consumer,
k -> new
MutableInt(getAvailablePermits(k)));
// a consumer was found for the sticky key hash and the
entry can be dispatched
- if (permits.intValue() > 0
- && canDispatchEntry(consumer, entry, readType,
stickyKeyHash, blockedByHash)) {
Review Comment:
How about this patch?
```java
index 374111267e..ca18d9f599 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -424,17 +424,18 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
}
int stickyKeyHash = getStickyKeyHash(entry);
Consumer consumer = null;
- MutableBoolean blockedByHash = null;
+ boolean blockedByHash = false;
boolean dispatchEntry = false;
// check if the hash is already blocked
boolean hashIsAlreadyBlocked =
alreadyBlockedHashes.contains(stickyKeyHash);
+
if (!hashIsAlreadyBlocked) {
consumer = selector.select(stickyKeyHash);
if (consumer != null) {
if (lookAheadAllowed) {
consumersForEntriesForLookaheadCheck.add(consumer);
}
- blockedByHash = lookAheadAllowed && readType ==
ReadType.Normal ? new MutableBoolean(false) : null;
+ final var canUpdateBlockedByHash = lookAheadAllowed &&
readType == ReadType.Normal;
MutableInt permits =
permitsForConsumer.computeIfAbsent(consumer,
k -> new
MutableInt(getAvailablePermits(k)));
@@ -446,10 +447,8 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
permits.decrement();
// allow the entry to be dispatched
dispatchEntry = true;
- } else {
- if (blockedByHash != null) {
- blockedByHash.setTrue();
- }
+ } else if (canUpdateBlockedByHash) {
+ blockedByHash = true;
}
}
}
@@ -464,7 +463,7 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
// the hash is blocked, add it to the set of blocked
hashes
alreadyBlockedHashes.add(stickyKeyHash);
}
- if (blockedByHash != null && blockedByHash.isTrue()) {
+ if (blockedByHash) {
// the entry is blocked by hash, add the consumer to
the blocked set
blockedByHashConsumers.add(consumer);
}
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -439,12 +439,18 @@ private Map<Consumer, List<Entry>>
filterAndGroupEntriesForDispatching(List<Entr
permitsForConsumer.computeIfAbsent(consumer,
k -> new
MutableInt(getAvailablePermits(k)));
// a consumer was found for the sticky key hash and the
entry can be dispatched
- if (permits.intValue() > 0
- && canDispatchEntry(consumer, entry, readType,
stickyKeyHash, blockedByHash)) {
Review Comment:
How about this patch?
```diff
index 374111267e..ca18d9f599 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -424,17 +424,18 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
}
int stickyKeyHash = getStickyKeyHash(entry);
Consumer consumer = null;
- MutableBoolean blockedByHash = null;
+ boolean blockedByHash = false;
boolean dispatchEntry = false;
// check if the hash is already blocked
boolean hashIsAlreadyBlocked =
alreadyBlockedHashes.contains(stickyKeyHash);
+
if (!hashIsAlreadyBlocked) {
consumer = selector.select(stickyKeyHash);
if (consumer != null) {
if (lookAheadAllowed) {
consumersForEntriesForLookaheadCheck.add(consumer);
}
- blockedByHash = lookAheadAllowed && readType ==
ReadType.Normal ? new MutableBoolean(false) : null;
+ final var canUpdateBlockedByHash = lookAheadAllowed &&
readType == ReadType.Normal;
MutableInt permits =
permitsForConsumer.computeIfAbsent(consumer,
k -> new
MutableInt(getAvailablePermits(k)));
@@ -446,10 +447,8 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
permits.decrement();
// allow the entry to be dispatched
dispatchEntry = true;
- } else {
- if (blockedByHash != null) {
- blockedByHash.setTrue();
- }
+ } else if (canUpdateBlockedByHash) {
+ blockedByHash = true;
}
}
}
@@ -464,7 +463,7 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
// the hash is blocked, add it to the set of blocked
hashes
alreadyBlockedHashes.add(stickyKeyHash);
}
- if (blockedByHash != null && blockedByHash.isTrue()) {
+ if (blockedByHash) {
// the entry is blocked by hash, add the consumer to
the blocked set
blockedByHashConsumers.add(consumer);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]