[
https://issues.apache.org/jira/browse/ARTEMIS-5536?focusedWorklogId=972789&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-972789
]
ASF GitHub Bot logged work on ARTEMIS-5536:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 12/Jun/25 15:00
Start Date: 12/Jun/25 15:00
Worklog Time Spent: 10m
Work Description: tabish121 commented on code in PR #5760:
URL: https://github.com/apache/activemq-artemis/pull/5760#discussion_r2142987459
##########
artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java:
##########
@@ -307,14 +315,28 @@ public int handleDeliver(MessageReference reference,
ICoreMessage message) {
reference.setProtocolData(MessageId.class,
dispatch.getMessage().getMessageId());
session.deliverMessage(dispatch);
// Prevent races with other updates that can lead to credit going
negative and starving consumers.
- currentWindow.updateAndGet(i -> i > 0 ? i - 1 : i);
+ currentWindow.updateAndGet(AMQConsumer::decrementWindow);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Decrementing credit, current={}",
currentWindow.get());
Review Comment:
This log says decrementing but it already decremented so it should either be
moved up, or reworded to indicate its reporting the post op result
##########
artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java:
##########
@@ -413,6 +453,9 @@ public boolean hasCredits() {
public void processMessagePull(MessagePull messagePull) throws Exception {
currentWindow.incrementAndGet();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Incrementing credit for processMessagePull, current =
{}", currentWindow.get());
Review Comment:
It was already incremented, so the value reported is post change, where the
current log makes it seems as if its reports the pre-change value
##########
artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java:
##########
@@ -438,6 +493,9 @@ public org.apache.activemq.command.ActiveMQDestination
getOpenwireDestination()
public void setPrefetchSize(int prefetchSize) {
this.prefetchSize = prefetchSize;
this.currentWindow.set(prefetchSize);
+ if (logger.isTraceEnabled()) {
+ logger.trace("Updating credits with a new prefetchSize",
prefetchSize, new Exception("trace"));
Review Comment:
Same as above, the log message is a bit misleading
##########
artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java:
##########
@@ -307,14 +315,28 @@ public int handleDeliver(MessageReference reference,
ICoreMessage message) {
reference.setProtocolData(MessageId.class,
dispatch.getMessage().getMessageId());
session.deliverMessage(dispatch);
// Prevent races with other updates that can lead to credit going
negative and starving consumers.
- currentWindow.updateAndGet(i -> i > 0 ? i - 1 : i);
+ currentWindow.updateAndGet(AMQConsumer::decrementWindow);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Decrementing credit, current={}",
currentWindow.get());
+ }
return size;
} catch (Throwable t) {
logger.warn("Error during message dispatch", t);
return 0;
}
}
+ static int decrementWindow(int value) {
+ if (value > 0) {
+ return value - 1;
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("preventing a negative value={}", value, new
Exception("trace"));
Review Comment:
Would be nice if the log was a tad more descriptive, "preventing the credit
window from going negative" or some such just to illuminate the future reader.
Issue Time Tracking
-------------------
Worklog Id: (was: 972789)
Time Spent: 0.5h (was: 20m)
> Openwire Flowcontrol Hardening
> ------------------------------
>
> Key: ARTEMIS-5536
> URL: https://issues.apache.org/jira/browse/ARTEMIS-5536
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Reporter: Clebert Suconic
> Assignee: Clebert Suconic
> Priority: Major
> Labels: pull-request-available
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> - AMQConsumer will discount credits ahead of taking them on rollback, that
> would lead to negative values
> - AMQConsumer.close() is blocking an operation
> - fixing other branches for flow control
> This is fixing RedeliveryPolicyTest.testCanRollbackPastPrefetch and improving
> reliability in other openwire tests
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact