cheers,
Rob
On 30 Dec 2006, at 23:49, [EMAIL PROTECTED] wrote:
> Author: chirino
> Date: Sat Dec 30 15:49:03 2006
> New Revision: 491346
>
> URL: http://svn.apache.org/viewvc?view=rev&rev=491346
> Log:
> Fix for CursorDurableTest.
> The TopicStorePrefetch was iterating items that were in the
> subscription but not added to the pending list.
>
> Modified:
> incubator/activemq/trunk/activemq-core/src/main/java/org/
apache/
> activemq/broker/region/PrefetchSubscription.java
> incubator/activemq/trunk/activemq-core/src/main/java/org/
apache/
> activemq/broker/region/cursors/TopicStorePrefetch.java
>
> Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/
> apache/activemq/broker/region/PrefetchSubscription.java
> URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/
activemq-
> core/src/main/java/org/apache/activemq/broker/region/
> PrefetchSubscription.java?view=diff&rev=491346&r1=491345&r2=491346
>
=====================================================================
=
> ========
> --- incubator/activemq/trunk/activemq-core/src/main/java/org/
apache/
> activemq/broker/region/PrefetchSubscription.java (original)
> +++ incubator/activemq/trunk/activemq-core/src/main/java/org/
apache/
> activemq/broker/region/PrefetchSubscription.java Sat Dec 30
> 15:49:03 2006
> @@ -406,7 +406,9 @@
> pending.reset();
> while(pending.hasNext()&&!isFull()
> &&count<numberToDispatch){
> MessageReference
node=pending.next();
> -
> + if ( node == null )
> + break;
> +
> if(canDispatch(node)){
> pending.remove();
> // Message may have been
> sitting in the pending list a while
>
> Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/
> apache/activemq/broker/region/cursors/TopicStorePrefetch.java
> URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/
activemq-
> core/src/main/java/org/apache/activemq/broker/region/cursors/
> TopicStorePrefetch.java?view=diff&rev=491346&r1=491345&r2=491346
>
=====================================================================
=
> ========
> --- incubator/activemq/trunk/activemq-core/src/main/java/org/
apache/
> activemq/broker/region/cursors/TopicStorePrefetch.java (original)
> +++ incubator/activemq/trunk/activemq-core/src/main/java/org/
apache/
> activemq/broker/region/cursors/TopicStorePrefetch.java Sat Dec 30
> 15:49:03 2006
> @@ -20,7 +20,7 @@
>
> import java.io.IOException;
> import java.util.LinkedList;
> -import javax.jms.JMSException;
> +
> import org.apache.activemq.broker.region.Destination;
> import org.apache.activemq.broker.region.MessageReference;
> import org.apache.activemq.broker.region.Topic;
> @@ -48,6 +48,10 @@
> private String subscriberName;
> private Destination regionDestination;
>
> + boolean empty=true;
> + private MessageId firstMessageId;
> + private MessageId lastMessageId;
> +
> /**
> * @param topic
> * @param clientId
> @@ -73,7 +77,7 @@
> * @return true if there are no pending messages
> */
> public boolean isEmpty(){
> - return batchList.isEmpty();
> + return empty;
> }
>
> public synchronized int size(){
> @@ -86,27 +90,55 @@
> }
>
> public synchronized void addMessageLast(MessageReference node)
> throws Exception{
> - if(node!=null){
> + if(node!=null){
> + if( empty ) {
> + firstMessageId = node.getMessageId();
> + empty=false;
> + }
> + lastMessageId = node.getMessageId();
> node.decrementReferenceCount();
> }
> }
>
> - public synchronized boolean hasNext(){
> - if(isEmpty()){
> - try{
> - fillBatch();
> - }catch(Exception e){
> - log.error("Failed to fill batch",e);
> - throw new RuntimeException(e);
> - }
> - }
> + public synchronized boolean hasNext() {
> return !isEmpty();
> }
>
> public synchronized MessageReference next(){
> - Message result = (Message)batchList.removeFirst();
> - result.setRegionDestination(regionDestination);
> - return result;
> +
> + if( empty ) {
> + return null;
> + } else {
> +
> + // We may need to fill in the batch...
> + if(batchList.isEmpty()){
> + try{
> + fillBatch();
> + }catch(Exception e){
> + log.error("Failed to fill batch",e);
> + throw new RuntimeException(e);
> + }
> + if( batchList.isEmpty()) {
> + return null;
> + }
> + }
> +
> + Message result = (Message)batchList.removeFirst();
> +
> + if( firstMessageId != null ) {
> + // Skip messages until we get to the first message.
> + if( !result.getMessageId().equals
(firstMessageId) )
> + return null;
> + firstMessageId = null;
> + }
> + if( lastMessageId != null ) {
> + if( result.getMessageId().equals
(lastMessageId) ) {
> + empty=true;
> + }
> + }
> + result.setRegionDestination(regionDestination);
> + return result;
> + }
> }
>
> public void reset(){
> @@ -130,13 +162,7 @@
>
> // implementation
> protected void fillBatch() throws Exception{
> - store.recoverNextMessages(clientId,subscriberName,
> - maxBatchSize,this);
> - // this will add more messages to the batch list
> - if(!batchList.isEmpty()){
> - Message message=(Message)batchList.getLast();
> -
> - }
> + store.recoverNextMessages
> (clientId,subscriberName,maxBatchSize,this);
> }
>
> public void gc() {
>
>