lhotari commented on PR #25805: URL: https://github.com/apache/pulsar/pull/25805#issuecomment-4516637169
> Let increasing `incomingMessagesSize` and decreasing `incomingMessagesSize` run in the same thread > * Since it is hard to add a test to reproduce the issue, I skipped writing the test Claude Code local review provided this type of analysis about this. > [INTENT MISMATCH] Race not fully eliminated — multiple decreaseIncomingMessageSize sites still run on the calling thread > The PR description says "Let increasing incomingMessagesSize and decreasing incomingMessagesSize run in the same thread", but only 4 of ~9 call sites are migrated. Not wrapped: > - ConsumerImpl.removeExpiredMessagesFromQueue (ConsumerImpl.java:2985) — runs from negative-ack / expiry tasks > - MultiTopicsConsumerImpl.removeExpiredMessagesFromQueue (MultiTopicsConsumerImpl.java:953) > - MultiTopicsConsumerImpl.messageProcessed (MultiTopicsConsumerImpl.java:351) — although in practice only invoked from notifyPendingBatchReceivedCallBack which already runs on > internalPinnedExecutor, so this is likely safe; worth a comment explaining the asymmetry. > - MultiTopicsConsumerImpl.internalReceiveAsync (MultiTopicsConsumerImpl.java:468) — already inside internalPinnedExecutor.execute(...), so OK. > The first two can still hit the original race. Consider routing all decrease* through the pinned executor, or push the serialization into ConsumerBase.decreaseIncomingMessageSize > itself so callers don't have to remember. Is it necessary to handle these findings? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
