Denovo1998 opened a new pull request, #24864:
URL: https://github.com/apache/pulsar/pull/24864

   <!--
   ### Contribution Checklist
     
     - PR title format should be *[type][component] summary*. For details, see 
*[Guideline - Pulsar PR Naming 
Convention](https://pulsar.apache.org/contribute/develop-semantic-title/)*. 
   
     - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
     
     - Each pull request should address only one issue, not mix up code from 
multiple issues.
     
     - Each commit in the pull request has a meaningful commit message
   
     - Once all items of the checklist are addressed, remove the above text and 
this checklist, leaving only the filled out template below.
   -->
   
   <!-- Either this PR fixes an issue, -->
   
   Fixes #24846
   
   <!-- or this PR is one task of an issue -->
   
   Main Issue: #xyz
   
   <!-- If the PR belongs to a PIP, please add the PIP link here -->
   
   PIP: #xyz 
   
   <!-- Details of when a PIP is required and how the PIP process work, please 
see: https://github.com/apache/pulsar/blob/master/pip/README.md -->
   
   ### Motivation
   
   The test 
PublishRateLimiterOverconsumingTest.testOverconsumingTokensWithBrokerPublishRateLimiter
 was intermittently failing under concurrent + batching scenarios. Failures 
were caused by:
   - Warm-up and window misalignment: the first effective “second” often spans 
partial time across two real seconds, producing a very low first bucket and a 
very high second bucket (e.g., 24 and 1212).
   - Bursty traffic: multiple independent producers with batching flushes lead 
to token bursts that concentrate into a single second.
   - Average rate computed from total elapsed time: when bursts make the 
overall processing time shorter, avg = totalMessages/timeSinceStart becomes 
artificially high (e.g., 750 vs 500) even though the rate limiter is fine.
   
   These issues are the same class as addressed for Dispatch in PR #24012. The 
test needs to measure rate more robustly to avoid false negatives while still 
detecting real regressions.
   
   ### Modifications
   
   This change makes the test resilient by aligning measurement and smoothing 
per-second variability, following the approach used for Dispatch in PR #24012:
   
   - Align measurement start to the first received message:
     - Start counting only after the first message arrives (set startTimeNanos 
in the messageListener). This removes the initial misalignment in the first 
bucket.
   
   - Use windowed average for “overall average”:
     - Compute the test’s “average rate” from the collected per-second windows 
(skip the first second and take up to durationSeconds windows), instead of 
using totalMessages/elapsedTime, which is sensitive to overall time shortening 
due to bursts.
     - Assertion: the window-based average must be within ±40% of the 
configured rate.
   
   - Use adjacent 2-second averages for per-second stability:
     - For each pair of adjacent seconds (skipping the head and tail pairs), 
assert that the mean of the two is within ±55% of the configured rate. This 
compensates for a low bucket followed by a high bucket caused by bursts or 
warm-up misalignment.
     - If there are too few windows (< 4), fall back to a single-second check 
(skip head and tail) with a ±50% tolerance to avoid spurious failures.
   
   - Make wait condition robust:
     - Wait for all messages to be consumed rather than requiring a particular 
number of windows. This avoids waiting on windows in fast runs.
   
   - Include trailing partial window:
     - Add the last partial bucket (if any) to the snapshot to not silently 
drop the tail.
   
   - Non-functional/logging:
     - Keep the 500 ms polling for the rate tracker but only emit a per-second 
rate on full-second boundaries using lastCalculatedSecond.
     - Additional comments and logs for clarity.
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc` <!-- Your PR contains doc changes. -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update 
later -->
   - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->
   
   ### Matching PR in forked repository
   
   PR in forked repository: <!-- ENTER URL HERE -->
   
   <!--
   After opening this PR, the build in apache/pulsar will fail and instructions 
will
   be provided for opening a PR in the PR author's forked repository.
   
   apache/pulsar pull requests should be first tested in your own fork since 
the 
   apache/pulsar CI based on GitHub Actions has constrained resources and quota.
   GitHub Actions provides separate quota for pull requests that are executed 
in 
   a forked repository.
   
   The tests will be run in the forked repository until all PR review comments 
have
   been handled, the tests pass and the PR is approved by a reviewer.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to