BewareMyPower commented on code in PR #23915:
URL: https://github.com/apache/pulsar/pull/23915#discussion_r1948370243


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -439,12 +439,18 @@ private Map<Consumer, List<Entry>> 
filterAndGroupEntriesForDispatching(List<Entr
                             permitsForConsumer.computeIfAbsent(consumer,
                                     k -> new 
MutableInt(getAvailablePermits(k)));
                     // a consumer was found for the sticky key hash and the 
entry can be dispatched
-                    if (permits.intValue() > 0
-                            && canDispatchEntry(consumer, entry, readType, 
stickyKeyHash, blockedByHash)) {

Review Comment:
   How about this patch?
   
   ```java
   index 374111267e..ca18d9f599 100644
   --- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
   +++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
   @@ -424,17 +424,18 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
                }
                int stickyKeyHash = getStickyKeyHash(entry);
                Consumer consumer = null;
   -            MutableBoolean blockedByHash = null;
   +            boolean blockedByHash = false;
                boolean dispatchEntry = false;
                // check if the hash is already blocked
                boolean hashIsAlreadyBlocked = 
alreadyBlockedHashes.contains(stickyKeyHash);
   +
                if (!hashIsAlreadyBlocked) {
                    consumer = selector.select(stickyKeyHash);
                    if (consumer != null) {
                        if (lookAheadAllowed) {
                            consumersForEntriesForLookaheadCheck.add(consumer);
                        }
   -                    blockedByHash = lookAheadAllowed && readType == 
ReadType.Normal ? new MutableBoolean(false) : null;
   +                    final var canUpdateBlockedByHash = lookAheadAllowed && 
readType == ReadType.Normal;
                        MutableInt permits =
                                permitsForConsumer.computeIfAbsent(consumer,
                                        k -> new 
MutableInt(getAvailablePermits(k)));
   @@ -446,10 +447,8 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
                                permits.decrement();
                                // allow the entry to be dispatched
                                dispatchEntry = true;
   -                        } else {
   -                            if (blockedByHash != null) {
   -                                blockedByHash.setTrue();
   -                            }
   +                        } else if (canUpdateBlockedByHash) {
   +                            blockedByHash = true;
                            }
                        }
                    }
   @@ -464,7 +463,7 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
                        // the hash is blocked, add it to the set of blocked 
hashes
                        alreadyBlockedHashes.add(stickyKeyHash);
                    }
   -                if (blockedByHash != null && blockedByHash.isTrue()) {
   +                if (blockedByHash) {
                        // the entry is blocked by hash, add the consumer to 
the blocked set
                        blockedByHashConsumers.add(consumer);
                    }
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -439,12 +439,18 @@ private Map<Consumer, List<Entry>> 
filterAndGroupEntriesForDispatching(List<Entr
                             permitsForConsumer.computeIfAbsent(consumer,
                                     k -> new 
MutableInt(getAvailablePermits(k)));
                     // a consumer was found for the sticky key hash and the 
entry can be dispatched
-                    if (permits.intValue() > 0
-                            && canDispatchEntry(consumer, entry, readType, 
stickyKeyHash, blockedByHash)) {

Review Comment:
   How about this patch?
   
   ```diff
   index 374111267e..ca18d9f599 100644
   --- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
   +++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
   @@ -424,17 +424,18 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
                }
                int stickyKeyHash = getStickyKeyHash(entry);
                Consumer consumer = null;
   -            MutableBoolean blockedByHash = null;
   +            boolean blockedByHash = false;
                boolean dispatchEntry = false;
                // check if the hash is already blocked
                boolean hashIsAlreadyBlocked = 
alreadyBlockedHashes.contains(stickyKeyHash);
   +
                if (!hashIsAlreadyBlocked) {
                    consumer = selector.select(stickyKeyHash);
                    if (consumer != null) {
                        if (lookAheadAllowed) {
                            consumersForEntriesForLookaheadCheck.add(consumer);
                        }
   -                    blockedByHash = lookAheadAllowed && readType == 
ReadType.Normal ? new MutableBoolean(false) : null;
   +                    final var canUpdateBlockedByHash = lookAheadAllowed && 
readType == ReadType.Normal;
                        MutableInt permits =
                                permitsForConsumer.computeIfAbsent(consumer,
                                        k -> new 
MutableInt(getAvailablePermits(k)));
   @@ -446,10 +447,8 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
                                permits.decrement();
                                // allow the entry to be dispatched
                                dispatchEntry = true;
   -                        } else {
   -                            if (blockedByHash != null) {
   -                                blockedByHash.setTrue();
   -                            }
   +                        } else if (canUpdateBlockedByHash) {
   +                            blockedByHash = true;
                            }
                        }
                    }
   @@ -464,7 +463,7 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
                        // the hash is blocked, add it to the set of blocked 
hashes
                        alreadyBlockedHashes.add(stickyKeyHash);
                    }
   -                if (blockedByHash != null && blockedByHash.isTrue()) {
   +                if (blockedByHash) {
                        // the entry is blocked by hash, add the consumer to 
the blocked set
                        blockedByHashConsumers.add(consumer);
                    }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to