[
https://issues.apache.org/activemq/browse/AMQ-2567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=56925#action_56925
]
Rudolf Janz commented on AMQ-2567:
----------------------------------
So I added a printout in the client (Expecting/Read) and in the
PrefetchSubscribtion for all Acks,
C1/2 are the consumer n is the number of acked messaged. This is how it happens
Expecting Msg1
Read Msg1
C1: n:1 isDeliveredAck(Before): 1
C1: n:1 isDeliveredAck(After): 1
C1: n:1 isStandardAckXA(Before): 1
C1: n:1 isStandardAckXA(After): 1
Expecting Msg2
C1: n:1 isDeliveredAck(Before): *2* This is wrong it should be in the same
state as before the message 1, should have been
C1: n:1 isDeliveredAck(After): 2
Read Msg2
C1: n:1 isStandardAckXA(Before): 2
C1: n:1 isStandardAckXA(After): 2
Expecting Msg3
C1: n:1 isDeliveredAck(Before): 2
C1: n:1 isDeliveredAck(After): 2
Read Msg3
C1: n:1 isStandardAckXA(Before): 2
C1: n:1 isStandardAckXA(After): 2
Expecting Msg4
Read Msg4
C1: n:1 isDeliveredAck(Before): 3
C1: n:1 isDeliveredAck(After): 3
C1: n:1 isStandardAckXA(Before): 3
C1: n:1 isStandardAckXA(After): 3
Expecting Msg5
Read Msg8
C2: n:1 isDeliveredAck(Before): 1
C2: n:1 isDeliveredAck(After): 1
C2: n:1 isStandardAckXA(Before): 1
C2: n:1 isStandardAckXA(After): 1
I think the problem is in the standardAck, this should decrement prefetchIndex.
But I guess I am forgetting something (I extended receiving two messages in the
transaction in the test case)
{code}
if (ack.isStandardAck()) {
...
if (context.isInTransaction()) {
// extend prefetch window only if not a pulling
// consumer
System.out.println(prefix+"isStandardAckXA(Before): " +prefetchExtension);
//RJ if (getPrefetchSize() != 0) {
//RJ prefetchExtension =
Math.max(prefetchExtension, index );
//RJ }
//RJ do the same as in the non transacted case
prefetchExtension = Math.max(0,
prefetchExtension - index);
System.out.println(prefix+"isStandardAckXA(After): " +prefetchExtension);
} else {
System.out.println(prefix+"isStandardAck(Before): " +prefetchExtension);
prefetchExtension = Math.max(0,
}}
prefetchExtension - index);
System.out.println(prefix+"isStandardAck(After): " +prefetchExtension);
}
{code}
Expecting Msg1
Read Msg1
C1: n:1 isDeliveredAck(Before): 1
C1: n:1 isDeliveredAck(After): 1
C1: n:1 isStandardAckXA(Before): 1
C1: n:1 isStandardAckXA(After): 0
Expecting Msg2
Read Msg2
C1: n:1 isDeliveredAck(Before): 1
C1: n:1 isDeliveredAck(After): 1
C1: n:1 isStandardAckXA(Before): 1
C1: n:1 isStandardAckXA(After): 0
Expecting Msg3
Read Msg3
C1: n:1 isDeliveredAck(Before): 1
C1: n:1 isDeliveredAck(After): 1
C1: n:1 isStandardAckXA(Before): 1
C1: n:1 isStandardAckXA(After): 0
Expecting Msg4
C1: n:1 isDeliveredAck(Before): 1
C1: n:1 isDeliveredAck(After): 1
Read Msg4
C1: n:1 isStandardAckXA(Before): 1
C1: n:1 isStandardAckXA(After): 0
Expecting Msg5
C2: n:1 isDeliveredAck(Before): 1
C2: n:1 isDeliveredAck(After): 1
Read Msg5
C2: n:1 isStandardAckXA(Before): 1
C2: n:1 isStandardAckXA(After): 0
Expecting Msg6
Read Msg6
Expecting Msg7
C1: n:1 isDeliveredAck(Before): 1
C1: n:1 isDeliveredAck(After): 1
Read Msg7
C1: n:1 isDeliveredAck(Before): 2
C1: n:1 isDeliveredAck(After): 2
C1: n:2 isStandardAckXA(Before): 2
C1: n:2 isStandardAckXA(After): 0
Expecting Msg8
Read Msg8
C2: n:1 isDeliveredAck(Before): 1
C2: n:1 isDeliveredAck(After): 1
C2: n:1 isStandardAckXA(Before): 1
C2: n:1 isStandardAckXA(After): 0
THis look ok to me now
> Zero Prefetch not working
> -------------------------
>
> Key: AMQ-2567
> URL: https://issues.apache.org/activemq/browse/AMQ-2567
> Project: ActiveMQ
> Issue Type: Bug
> Components: Broker
> Affects Versions: 5.3.0
> Environment: Unix/Windows, Java 1.6
> Reporter: Rudolf Janz
> Attachments: PrefetchSubscription.java,
> ZeroPrefetchConsumerTest.java, ZeroPrefetchConsumerTest.java
>
>
> I have problems using the zero prefetch. The consumer starts prefetching
> messages after some receives.
>
> Our use case is, that we have a large computation which is split into smaller
> sub jobs. These jobs are sent via ActiveMQ to some processing nodes on
> different machines. The duration of jobs differs very much (10s to some
> minutes). The jobs are sent by decreasing estimated computation time. If one
> of the consumers which receives a large job prefetches some other jobs, these
> will be processed later. In the meantime the remaining consumers are idle,
> and the total computation time is much longer than necessary.
>
> I have modified the existing ZeroPrefetchConsumerTest to test for the problem
> (I have removed the other test methods).
>
> Two consumers (C1 and C2) are instantiated. These messages are sent:
> 1,2,3,4,5,6,7,8,9
>
> C1 reads 4 times, receives 1,2,3,4 -> this is correct
>
> now C2 reads, it receives 8 not 5, which is the next message in the queue.
> The reason is, that C1 prefetched 5, 6, 7, that should not have
> happened.(sometimes C1 only prefetches 5,6)
>
> The problem can be seen in the JMX Console as well, after a while, the first
> consumer has more than one dispatched message and the queue has an
> InflightCount of 3, although there are only two consumers!
>
> The last version that we used was 4.1.1, that worked.
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.