[ 
https://issues.apache.org/activemq/browse/AMQ-2567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=56925#action_56925
 ] 

Rudolf Janz commented on AMQ-2567:
----------------------------------

So I added a printout in the client (Expecting/Read) and in the 
PrefetchSubscribtion for all Acks,
C1/2 are the consumer n is the number of acked messaged. This is how it happens

Expecting Msg1
Read Msg1
C1: n:1 isDeliveredAck(Before): 1
C1: n:1 isDeliveredAck(After): 1
C1: n:1 isStandardAckXA(Before): 1
C1: n:1 isStandardAckXA(After): 1
Expecting Msg2
C1: n:1 isDeliveredAck(Before): *2*  This is wrong it should be in the same 
state as before the message 1, should have been 
C1: n:1 isDeliveredAck(After): 2
Read Msg2
C1: n:1 isStandardAckXA(Before): 2
C1: n:1 isStandardAckXA(After): 2
Expecting Msg3
C1: n:1 isDeliveredAck(Before): 2
C1: n:1 isDeliveredAck(After): 2
Read Msg3
C1: n:1 isStandardAckXA(Before): 2
C1: n:1 isStandardAckXA(After): 2
Expecting Msg4
Read Msg4
C1: n:1 isDeliveredAck(Before): 3
C1: n:1 isDeliveredAck(After): 3
C1: n:1 isStandardAckXA(Before): 3
C1: n:1 isStandardAckXA(After): 3
Expecting Msg5
Read Msg8
C2: n:1 isDeliveredAck(Before): 1
C2: n:1 isDeliveredAck(After): 1
C2: n:1 isStandardAckXA(Before): 1
C2: n:1 isStandardAckXA(After): 1

I think the problem is in the standardAck, this should decrement prefetchIndex. 
But I guess I am forgetting something (I extended receiving two messages in the 
transaction in the test case) 

{code}
            if (ack.isStandardAck()) {
                   ...
                            if (context.isInTransaction()) {
                                // extend prefetch window only if not a pulling
                                // consumer
                                
System.out.println(prefix+"isStandardAckXA(Before): " +prefetchExtension);
//RJ                                if (getPrefetchSize() != 0) {
//RJ                                   prefetchExtension = 
Math.max(prefetchExtension, index );
//RJ                               }
//RJ do the same as in the non transacted case
                                prefetchExtension = Math.max(0,    
prefetchExtension - index);
                                
System.out.println(prefix+"isStandardAckXA(After): " +prefetchExtension);
                            } else {
                                
System.out.println(prefix+"isStandardAck(Before): " +prefetchExtension);
                                prefetchExtension = Math.max(0,
}}
                                        prefetchExtension - index);
                                
System.out.println(prefix+"isStandardAck(After): " +prefetchExtension);
                            }
{code}


Expecting Msg1
Read Msg1
C1: n:1 isDeliveredAck(Before): 1
C1: n:1 isDeliveredAck(After): 1
C1: n:1 isStandardAckXA(Before): 1
C1: n:1 isStandardAckXA(After): 0
Expecting Msg2
Read Msg2
C1: n:1 isDeliveredAck(Before): 1
C1: n:1 isDeliveredAck(After): 1
C1: n:1 isStandardAckXA(Before): 1
C1: n:1 isStandardAckXA(After): 0
Expecting Msg3
Read Msg3
C1: n:1 isDeliveredAck(Before): 1
C1: n:1 isDeliveredAck(After): 1
C1: n:1 isStandardAckXA(Before): 1
C1: n:1 isStandardAckXA(After): 0
Expecting Msg4
C1: n:1 isDeliveredAck(Before): 1
C1: n:1 isDeliveredAck(After): 1
Read Msg4
C1: n:1 isStandardAckXA(Before): 1
C1: n:1 isStandardAckXA(After): 0
Expecting Msg5
C2: n:1 isDeliveredAck(Before): 1
C2: n:1 isDeliveredAck(After): 1
Read Msg5
C2: n:1 isStandardAckXA(Before): 1
C2: n:1 isStandardAckXA(After): 0
Expecting Msg6
Read Msg6
Expecting Msg7
C1: n:1 isDeliveredAck(Before): 1
C1: n:1 isDeliveredAck(After): 1
Read Msg7
C1: n:1 isDeliveredAck(Before): 2
C1: n:1 isDeliveredAck(After): 2
C1: n:2 isStandardAckXA(Before): 2
C1: n:2 isStandardAckXA(After): 0
Expecting Msg8
Read Msg8
C2: n:1 isDeliveredAck(Before): 1
C2: n:1 isDeliveredAck(After): 1
C2: n:1 isStandardAckXA(Before): 1
C2: n:1 isStandardAckXA(After): 0

THis look ok to me now

> Zero Prefetch not working
> -------------------------
>
>                 Key: AMQ-2567
>                 URL: https://issues.apache.org/activemq/browse/AMQ-2567
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.3.0
>         Environment: Unix/Windows, Java 1.6
>            Reporter: Rudolf Janz
>         Attachments: PrefetchSubscription.java, 
> ZeroPrefetchConsumerTest.java, ZeroPrefetchConsumerTest.java
>
>
> I have problems using the zero prefetch. The consumer starts prefetching 
> messages after some receives. 
>  
> Our use case is, that we have a large computation which is split into smaller 
> sub jobs. These jobs are sent via ActiveMQ to some processing nodes on 
> different machines. The duration of jobs differs very much (10s to some 
> minutes). The jobs are sent by decreasing estimated computation time. If one 
> of the consumers which receives a large job prefetches some other jobs, these 
> will be processed later. In the meantime the remaining consumers are idle, 
> and the total computation time is much longer than necessary.
>  
> I have modified the existing ZeroPrefetchConsumerTest to test for the problem 
> (I have removed the other test methods). 
>  
> Two consumers (C1  and C2) are instantiated. These messages are sent:
> 1,2,3,4,5,6,7,8,9
>  
> C1 reads 4 times, receives 1,2,3,4 -> this is correct
>  
> now C2 reads, it receives 8 not 5, which is the next message in the queue. 
> The reason is, that C1 prefetched 5, 6, 7, that should not have 
> happened.(sometimes C1 only prefetches 5,6)
>  
> The problem can be seen in the JMX Console as well, after a while, the first 
> consumer has more than one dispatched message and the queue has an 
> InflightCount of 3, although there are only two consumers!
>  
> The last version that we used was 4.1.1, that worked.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to