Denovo1998 commented on code in PR #25076:
URL: https://github.com/apache/pulsar/pull/25076#discussion_r2623327409


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java:
##########
@@ -200,7 +203,12 @@ public NavigableSet<Position> getScheduledMessages(int 
maxMessages) {
                     delayedMessagesCount.addAndGet(-n);
                     n = 0;
                 }
-                if (n <= 0) {
+                if (n == 0) {
+                    break;
+                } else if (n < 0) {
+                    // should not go into this situation
+                    log.error("[{}] Delayed message tracker 
getScheduledMessages should not < 0, number is: {}",
+                            dispatcher.getName(), n);

Review Comment:
   About the new n < 0 branch: this should be unreachable in normal flow. One 
potential way to hit it is int overflow from int cardinality = (int) 
entryIds.getLongCardinality().
   
   Would it be better to keep cardinality as long (and compare cardinality <= 
(long) n) to eliminate overflow, instead of only logging when n < 0?



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java:
##########
@@ -274,4 +274,69 @@ public void 
testDelaySequence(InMemoryDelayedDeliveryTracker tracker) throws Exc
         tracker.close();
     }
 
+    @Test(dataProvider = "delayedTracker")
+    public void 
testDelayedMessagesCountWithDuplicateEntryId(InMemoryDelayedDeliveryTracker 
tracker) throws Exception {
+        assertFalse(tracker.hasMessageAvailable());
+
+        // case1: addMessage() with duplicate entryId,
+        // getScheduledMessages() enter "cardinality <= n" and make tracker 
empty
+        assertTrue(tracker.addMessage(1, 1, 10));
+        assertTrue(tracker.addMessage(1, 2, 20));
+        assertTrue(tracker.addMessage(1, 2, 20));
+        assertTrue(tracker.addMessage(1, 3, 40));
+        assertTrue(tracker.addMessage(1, 4, 50));
+
+        clockTime.set(50);
+        assertTrue(tracker.hasMessageAvailable());
+        assertEquals(tracker.getNumberOfDelayedMessages(), 4L);
+        assertEquals(tracker.delayedMessageMap.size(), 4L);
+
+        Set<Position> scheduled = tracker.getScheduledMessages(10);
+        assertEquals(scheduled.size(), 4);
+        assertEquals(tracker.getNumberOfDelayedMessages(), 0L);
+
+
+
+        // case2: addMessage() with duplicate entryId,

Review Comment:
   In case2 the comment says it enters cardinality > n, but with 
getScheduledMessages(10) and 4 unique entryIds it should hit the cardinality <= 
n branch. Could we adjust the comment to match the scenario (case3 seems to be 
the one exercising cardinality > n)?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java:
##########


Review Comment:
   One thought: should updateTimer() and checkAndUpdateHighest(deliverAt) run 
only when we actually insert a new entryId?
   
   With the current structure, duplicate addMessage() calls still update 
highestDeliveryTimeTracked / messagesHaveFixedDelay, which could disable the 
fixed-delay optimization even though the tracker state didn’t change.”



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to