Denovo1998 commented on code in PR #25076:
URL: https://github.com/apache/pulsar/pull/25076#discussion_r2623327409
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java:
##########
@@ -200,7 +203,12 @@ public NavigableSet<Position> getScheduledMessages(int
maxMessages) {
delayedMessagesCount.addAndGet(-n);
n = 0;
}
- if (n <= 0) {
+ if (n == 0) {
+ break;
+ } else if (n < 0) {
+ // should not go into this situation
+ log.error("[{}] Delayed message tracker
getScheduledMessages should not < 0, number is: {}",
+ dispatcher.getName(), n);
Review Comment:
About the new n < 0 branch: this should be unreachable in normal flow. One
potential way to hit it is int overflow from int cardinality = (int)
entryIds.getLongCardinality().
Would it be better to keep cardinality as long (and compare cardinality <=
(long) n) to eliminate overflow, instead of only logging when n < 0?
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java:
##########
@@ -274,4 +274,69 @@ public void
testDelaySequence(InMemoryDelayedDeliveryTracker tracker) throws Exc
tracker.close();
}
+ @Test(dataProvider = "delayedTracker")
+ public void
testDelayedMessagesCountWithDuplicateEntryId(InMemoryDelayedDeliveryTracker
tracker) throws Exception {
+ assertFalse(tracker.hasMessageAvailable());
+
+ // case1: addMessage() with duplicate entryId,
+ // getScheduledMessages() enter "cardinality <= n" and make tracker
empty
+ assertTrue(tracker.addMessage(1, 1, 10));
+ assertTrue(tracker.addMessage(1, 2, 20));
+ assertTrue(tracker.addMessage(1, 2, 20));
+ assertTrue(tracker.addMessage(1, 3, 40));
+ assertTrue(tracker.addMessage(1, 4, 50));
+
+ clockTime.set(50);
+ assertTrue(tracker.hasMessageAvailable());
+ assertEquals(tracker.getNumberOfDelayedMessages(), 4L);
+ assertEquals(tracker.delayedMessageMap.size(), 4L);
+
+ Set<Position> scheduled = tracker.getScheduledMessages(10);
+ assertEquals(scheduled.size(), 4);
+ assertEquals(tracker.getNumberOfDelayedMessages(), 0L);
+
+
+
+ // case2: addMessage() with duplicate entryId,
Review Comment:
In case2 the comment says it enters cardinality > n, but with
getScheduledMessages(10) and 4 unique entryIds it should hit the cardinality <=
n branch. Could we adjust the comment to match the scenario (case3 seems to be
the one exercising cardinality > n)?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java:
##########
Review Comment:
One thought: should updateTimer() and checkAndUpdateHighest(deliverAt) run
only when we actually insert a new entryId?
With the current structure, duplicate addMessage() calls still update
highestDeliveryTimeTracked / messagesHaveFixedDelay, which could disable the
fixed-delay optimization even though the tracker state didn’t change.”
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]