>
>
>> cheers,
>>
>> Rob
>>
>> On 30 Dec 2006, at 23:49, [EMAIL PROTECTED] wrote:
>>
>> > Author: chirino
>> > Date: Sat Dec 30 15:49:03 2006
>> > New Revision: 491346
>> >
>> > URL: http://svn.apache.org/viewvc?view=rev&rev=491346
>> > Log:
>> > Fix for CursorDurableTest.
>> > The TopicStorePrefetch was iterating items that were in the
>> > subscription but not added to the pending list.
>> >
>> > Modified:
>> > incubator/activemq/trunk/activemq-core/src/main/java/org/
>> apache/
>> > activemq/broker/region/PrefetchSubscription.java
>> > incubator/activemq/trunk/activemq-core/src/main/java/org/
>> apache/
>> > activemq/broker/region/cursors/TopicStorePrefetch.java
>> >
>> > Modified: incubator/activemq/trunk/activemq-core/src/main/
java/org/
>> > apache/activemq/broker/region/PrefetchSubscription.java
>> > URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/
>> activemq-
>> > core/src/main/java/org/apache/activemq/broker/region/
>> > PrefetchSubscription.java?
view=diff&rev=491346&r1=491345&r2=491346
>> >
>>
=====================================================================
>> =
>> > ========
>> > --- incubator/activemq/trunk/activemq-core/src/main/java/org/
>> apache/
>> > activemq/broker/region/PrefetchSubscription.java (original)
>> > +++ incubator/activemq/trunk/activemq-core/src/main/java/org/
>> apache/
>> > activemq/broker/region/PrefetchSubscription.java Sat Dec 30
>> > 15:49:03 2006
>> > @@ -406,7 +406,9 @@
>> > pending.reset();
>> > while(pending.hasNext()&&!isFull()
>> > &&count<numberToDispatch){
>> > MessageReference
>> node=pending.next();
>> > -
>> > + if ( node == null )
>> > + break;
>> > +
>> > if(canDispatch(node)){
>> > pending.remove();
>> > // Message may have been
>> > sitting in the pending list a while
>> >
>> > Modified: incubator/activemq/trunk/activemq-core/src/main/
java/org/
>> > apache/activemq/broker/region/cursors/TopicStorePrefetch.java
>> > URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/
>> activemq-
>> > core/src/main/java/org/apache/activemq/broker/region/cursors/
>> > TopicStorePrefetch.java?view=diff&rev=491346&r1=491345&r2=491346
>> >
>>
=====================================================================
>> =
>> > ========
>> > --- incubator/activemq/trunk/activemq-core/src/main/java/org/
>> apache/
>> > activemq/broker/region/cursors/TopicStorePrefetch.java
(original)
>> > +++ incubator/activemq/trunk/activemq-core/src/main/java/org/
>> apache/
>> > activemq/broker/region/cursors/TopicStorePrefetch.java Sat
Dec 30
>> > 15:49:03 2006
>> > @@ -20,7 +20,7 @@
>> >
>> > import java.io.IOException;
>> > import java.util.LinkedList;
>> > -import javax.jms.JMSException;
>> > +
>> > import org.apache.activemq.broker.region.Destination;
>> > import org.apache.activemq.broker.region.MessageReference;
>> > import org.apache.activemq.broker.region.Topic;
>> > @@ -48,6 +48,10 @@
>> > private String subscriberName;
>> > private Destination regionDestination;
>> >
>> > + boolean empty=true;
>> > + private MessageId firstMessageId;
>> > + private MessageId lastMessageId;
>> > +
>> > /**
>> > * @param topic
>> > * @param clientId
>> > @@ -73,7 +77,7 @@
>> > * @return true if there are no pending messages
>> > */
>> > public boolean isEmpty(){
>> > - return batchList.isEmpty();
>> > + return empty;
>> > }
>> >
>> > public synchronized int size(){
>> > @@ -86,27 +90,55 @@
>> > }
>> >
>> > public synchronized void addMessageLast(MessageReference
node)
>> > throws Exception{
>> > - if(node!=null){
>> > + if(node!=null){
>> > + if( empty ) {
>> > + firstMessageId =
node.getMessageId();
>> > + empty=false;
>> > + }
>> > + lastMessageId = node.getMessageId();
>> > node.decrementReferenceCount();
>> > }
>> > }
>> >
>> > - public synchronized boolean hasNext(){
>> > - if(isEmpty()){
>> > - try{
>> > - fillBatch();
>> > - }catch(Exception e){
>> > - log.error("Failed to fill batch",e);
>> > - throw new RuntimeException(e);
>> > - }
>> > - }
>> > + public synchronized boolean hasNext() {
>> > return !isEmpty();
>> > }
>> >
>> > public synchronized MessageReference next(){
>> > - Message result = (Message)batchList.removeFirst();
>> > - result.setRegionDestination(regionDestination);
>> > - return result;
>> > +
>> > + if( empty ) {
>> > + return null;
>> > + } else {
>> > +
>> > + // We may need to fill in the batch...
>> > + if(batchList.isEmpty()){
>> > + try{
>> > + fillBatch();
>> > + }catch(Exception e){
>> > + log.error("Failed to fill batch",e);
>> > + throw new RuntimeException(e);
>> > + }
>> > + if( batchList.isEmpty()) {
>> > + return null;
>> > + }
>> > + }
>> > +
>> > + Message result = (Message)batchList.removeFirst();
>> > +
>> > + if( firstMessageId != null ) {
>> > + // Skip messages until we get to the first
message.
>> > + if( !result.getMessageId().equals
>> (firstMessageId) )
>> > + return null;
>> > + firstMessageId = null;
>> > + }
>> > + if( lastMessageId != null ) {
>> > + if( result.getMessageId().equals
>> (lastMessageId) ) {
>> > + empty=true;
>> > + }
>> > + }
>> > + result.setRegionDestination(regionDestination);
>> > + return result;
>> > + }
>> > }
>> >
>> > public void reset(){
>> > @@ -130,13 +162,7 @@
>> >
>> > // implementation
>> > protected void fillBatch() throws Exception{
>> > - store.recoverNextMessages(clientId,subscriberName,
>> > - maxBatchSize,this);
>> > - // this will add more messages to the batch list
>> > - if(!batchList.isEmpty()){
>> > - Message message=(Message)batchList.getLast();
>> > -
>> > - }
>> > + store.recoverNextMessages
>> > (clientId,subscriberName,maxBatchSize,this);
>> > }
>> >
>> > public void gc() {
>> >
>> >
>>
>>
>
>
> --
> Regards,
> Hiram
>
> Blog: http://hiramchirino.com