Repository: activemq-artemis
Updated Branches:
  refs/heads/master d4dd07023 -> a5a993ed9


ARTEMIS-332 - test added / better dealing with critical errors on paging


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a5a993ed
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a5a993ed
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a5a993ed

Branch: refs/heads/master
Commit: a5a993ed9d67e47e97a0a8198d75600188df2eae
Parents: d4dd070
Author: Clebert Suconic <clebertsuco...@apache.org>
Authored: Wed Jan 6 15:45:19 2016 -0500
Committer: Clebert Suconic <clebertsuco...@apache.org>
Committed: Wed Jan 6 19:42:45 2016 -0500

----------------------------------------------------------------------
 .../artemis/core/paging/PagingStore.java        |   2 -
 .../core/paging/cursor/PageCursorProvider.java  |   5 +-
 .../core/paging/cursor/PageSubscription.java    |   5 +-
 .../core/paging/cursor/PagedReference.java      |   3 +-
 .../core/paging/cursor/PagedReferenceImpl.java  |  17 +-
 .../cursor/impl/PageCursorProviderImpl.java     |  10 +-
 .../cursor/impl/PageSubscriptionImpl.java       | 102 +++------
 .../core/paging/impl/PagingStoreImpl.java       |   6 -
 .../artemis/core/server/MessageReference.java   |   4 +-
 .../activemq/artemis/core/server/Queue.java     |   2 +-
 .../core/server/impl/LastValueQueue.java        |  79 ++-----
 .../artemis/core/server/impl/QueueImpl.java     |  63 +++---
 .../artemis/core/server/impl/RefsOperation.java |  37 ++--
 .../tests/extras/byteman/PagingOMETest.java     | 206 +++++++++++++++++++
 .../storage/PersistMultiThreadTest.java         |   4 -
 15 files changed, 320 insertions(+), 225 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a5a993ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
index c8808b3..e831966 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
@@ -47,8 +47,6 @@ public interface PagingStore extends ActiveMQComponent {
 
    int getNumberOfPages();
 
-   void criticalException(Throwable e);
-
    /**
     * Returns the page id of the current page in which the system is writing 
files.
     */

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a5a993ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
index c8404ba..951b83c 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.artemis.core.paging.cursor;
 
-import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 
@@ -25,7 +24,7 @@ import org.apache.activemq.artemis.core.paging.PagedMessage;
  */
 public interface PageCursorProvider {
 
-   PageCache getPageCache(long pageNr) throws ActiveMQException;
+   PageCache getPageCache(long pageNr);
 
    PagedReference newReference(final PagePosition pos, final PagedMessage msg, 
PageSubscription sub);
 
@@ -39,7 +38,7 @@ public interface PageCursorProvider {
 
    PageSubscription createSubscription(long queueId, Filter filter, boolean 
durable);
 
-   PagedMessage getMessage(PagePosition pos) throws ActiveMQException;
+   PagedMessage getMessage(PagePosition pos);
 
    void processReload() throws Exception;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a5a993ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
index 386f21f..df2ccc3 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
@@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.paging.cursor;
 
 import java.util.concurrent.Executor;
 
-import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.paging.impl.Page;
@@ -96,7 +95,7 @@ public interface PageSubscription {
 
    void reloadPageCompletion(PagePosition position);
 
-   void reloadPageInfo(long pageNr) throws ActiveMQException;
+   void reloadPageInfo(long pageNr);
 
    /**
     * To be called when the cursor decided to ignore a position.
@@ -148,7 +147,7 @@ public interface PageSubscription {
     * @param pos
     * @return
     */
-   PagedMessage queryMessage(PagePosition pos) throws ActiveMQException;
+   PagedMessage queryMessage(PagePosition pos);
 
    /**
     * @return executor used by the PageSubscription

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a5a993ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java
index 46041c5..c1ff089 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.artemis.core.paging.cursor;
 
-import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.server.MessageReference;
 
@@ -24,5 +23,5 @@ public interface PagedReference extends MessageReference {
 
    PagePosition getPosition();
 
-   PagedMessage getPagedMessage() throws ActiveMQException;
+   PagedMessage getPagedMessage();
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a5a993ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
index 964737f..82b0e92 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
@@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.paging.cursor;
 import java.lang.ref.WeakReference;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@@ -50,12 +49,12 @@ public class PagedReferenceImpl implements PagedReference {
    private boolean alreadyAcked;
 
    @Override
-   public ServerMessage getMessage() throws ActiveMQException {
+   public ServerMessage getMessage() {
       return getPagedMessage().getMessage();
    }
 
    @Override
-   public synchronized PagedMessage getPagedMessage() throws ActiveMQException 
{
+   public synchronized PagedMessage getPagedMessage() {
       PagedMessage returnMessage = message != null ? message.get() : null;
 
       // We only keep a few references on the Queue from paging...
@@ -111,7 +110,7 @@ public class PagedReferenceImpl implements PagedReference {
          try {
             messageEstimate = getMessage().getMemoryEstimate();
          }
-         catch (ActiveMQException e) {
+         catch (Throwable e) {
             ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
          }
       }
@@ -120,13 +119,7 @@ public class PagedReferenceImpl implements PagedReference {
 
    @Override
    public MessageReference copy(final Queue queue) {
-      try {
-         return new PagedReferenceImpl(this.position, this.getPagedMessage(), 
this.subscription);
-      }
-      catch (ActiveMQException e) {
-         ActiveMQServerLogger.LOGGER.warn(e);
-         return this;
-      }
+      return new PagedReferenceImpl(this.position, this.getPagedMessage(), 
this.subscription);
    }
 
    @Override
@@ -141,7 +134,7 @@ public class PagedReferenceImpl implements PagedReference {
                deliveryTime = 0L;
             }
          }
-         catch (ActiveMQException e) {
+         catch (Throwable e) {
             ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
             return 0L;
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a5a993ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
index ef57e1c..5f5e1b3 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -24,8 +24,6 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.paging.PagingStore;
@@ -111,7 +109,7 @@ public class PageCursorProviderImpl implements 
PageCursorProvider {
    }
 
    @Override
-   public PagedMessage getMessage(final PagePosition pos) throws 
ActiveMQException {
+   public PagedMessage getMessage(final PagePosition pos) {
       PageCache cache = getPageCache(pos.getPageNr());
 
       if (cache == null || pos.getMessageNr() >= cache.getNumberOfMessages()) {
@@ -130,7 +128,7 @@ public class PageCursorProviderImpl implements 
PageCursorProvider {
    }
 
    @Override
-   public PageCache getPageCache(final long pageId) throws ActiveMQException {
+   public PageCache getPageCache(final long pageId) {
       try {
          PageCache cache;
          synchronized (softCache) {
@@ -157,8 +155,8 @@ public class PageCursorProviderImpl implements 
PageCursorProvider {
 
          return cache;
       }
-      catch (Throwable e) {
-         throw new ActiveMQIOErrorException("Couldn't complete paging due to 
an IO Exception on Paging - " + e.getMessage(), e);
+      catch (Exception e) {
+         throw new RuntimeException(e.getMessage(), e);
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a5a993ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index d7a6ded..9c1702e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -333,7 +333,7 @@ final class PageSubscriptionImpl implements 
PageSubscription {
       return "PageSubscriptionImpl [cursorId=" + cursorId + ", queue=" + queue 
+ ", filter = " + filter + "]";
    }
 
-   private PagedReference getReference(PagePosition pos) throws 
ActiveMQException {
+   private PagedReference getReference(PagePosition pos) {
       return cursorProvider.newReference(pos, cursorProvider.getMessage(pos), 
this);
    }
 
@@ -342,7 +342,7 @@ final class PageSubscriptionImpl implements 
PageSubscription {
       return new CursorIterator();
    }
 
-   private PagedReference internalGetNext(final PagePosition pos) throws 
ActiveMQException {
+   private PagedReference internalGetNext(final PagePosition pos) {
       PagePosition retPos = pos.nextMessage();
 
       PageCache cache = cursorProvider.getPageCache(pos.getPageNr());
@@ -471,17 +471,11 @@ final class PageSubscriptionImpl implements 
PageSubscription {
          public void onError(final int errorCode, final String errorMessage) {
             error = " errorCode=" + errorCode + ", msg=" + errorMessage;
             ActiveMQServerLogger.LOGGER.pageSubscriptionError(this, error);
-            getPagingStore().criticalException(new 
ActiveMQException(errorMessage));
          }
 
          @Override
          public void done() {
-            try {
-               processACK(position);
-            }
-            catch (ActiveMQException e) {
-               getPagingStore().criticalException(e);
-            }
+            processACK(position);
          }
 
          @Override
@@ -511,11 +505,10 @@ final class PageSubscriptionImpl implements 
PageSubscription {
 
    @Override
    public void addPendingDelivery(final PagePosition position) {
-      try {
-         getPageInfo(position).incrementPendingTX();
-      }
-      catch (Exception e) {
-         getPagingStore().criticalException(e);
+      PageCursorInfo info = getPageInfo(position);
+
+      if (info != null) {
+         info.incrementPendingTX();
       }
    }
 
@@ -535,7 +528,7 @@ final class PageSubscriptionImpl implements 
PageSubscription {
    }
 
    @Override
-   public PagedMessage queryMessage(PagePosition pos) throws ActiveMQException 
{
+   public PagedMessage queryMessage(PagePosition pos) {
       return cursorProvider.getMessage(pos);
    }
 
@@ -554,32 +547,17 @@ final class PageSubscriptionImpl implements 
PageSubscription {
    @Override
    public void reloadPreparedACK(final Transaction tx, final PagePosition 
position) {
       deliveredCount.incrementAndGet();
-      try {
-         installTXCallback(tx, position);
-      }
-      catch (Exception e) {
-         getPagingStore().criticalException(e);
-      }
+      installTXCallback(tx, position);
    }
 
    @Override
    public void positionIgnored(final PagePosition position) {
-      try {
-         processACK(position);
-      }
-      catch (Exception e) {
-         getPagingStore().criticalException(e);
-      }
+      processACK(position);
    }
 
    public void lateDeliveryRollback(PagePosition position) {
-      try {
-         PageCursorInfo cursorInfo = processACK(position);
-         cursorInfo.decrementPendingTX();
-      }
-      catch (ActiveMQException e) {
-         getPagingStore().criticalException(e);
-      }
+      PageCursorInfo cursorInfo = processACK(position);
+      cursorInfo.decrementPendingTX();
    }
 
    @Override
@@ -750,15 +728,15 @@ final class PageSubscriptionImpl implements 
PageSubscription {
    }
 
    @Override
-   public void reloadPageInfo(long pageNr) throws ActiveMQException {
+   public void reloadPageInfo(long pageNr) {
       getPageInfo(pageNr, true);
    }
 
-   private PageCursorInfo getPageInfo(final PagePosition pos) throws 
ActiveMQException {
+   private PageCursorInfo getPageInfo(final PagePosition pos) {
       return getPageInfo(pos.getPageNr(), true);
    }
 
-   private PageCursorInfo getPageInfo(final long pageNr, boolean create) 
throws ActiveMQException {
+   private PageCursorInfo getPageInfo(final long pageNr, boolean create) {
       synchronized (consumedPages) {
          PageCursorInfo pageInfo = consumedPages.get(pageNr);
 
@@ -792,7 +770,7 @@ final class PageSubscriptionImpl implements 
PageSubscription {
 
    // To be called only after the ACK has been processed and guaranteed to be 
on storage
    // The only exception is on non storage events such as not matching messages
-   private PageCursorInfo processACK(final PagePosition pos) throws 
ActiveMQException {
+   private PageCursorInfo processACK(final PagePosition pos) {
       if (lastAckedPosition == null || pos.compareTo(lastAckedPosition) > 0) {
          if (isTrace) {
             ActiveMQServerLogger.LOGGER.trace("a new position is being 
processed as ACK");
@@ -828,7 +806,7 @@ final class PageSubscriptionImpl implements 
PageSubscription {
     * @param tx
     * @param position
     */
-   private void installTXCallback(final Transaction tx, final PagePosition 
position) throws ActiveMQException {
+   private void installTXCallback(final Transaction tx, final PagePosition 
position) {
       if (position.getRecordID() >= 0) {
          // It needs to persist, otherwise the cursor will return to the fist 
page position
          tx.setContainsPersistent();
@@ -960,13 +938,7 @@ final class PageSubscriptionImpl implements 
PageSubscription {
       }
 
       public boolean isDone() {
-         try {
-            return completePage != null || (getNumberOfMessages() == 
confirmed.get() && pendingTX.get() == 0);
-         }
-         catch (ActiveMQException e) {
-            getPagingStore().criticalException(e);
-            throw new RuntimeException(e.getMessage(), e);
-         }
+         return completePage != null || (getNumberOfMessages() == 
confirmed.get() && pendingTX.get() == 0);
       }
 
       public boolean isPendingDelete() {
@@ -1047,7 +1019,7 @@ final class PageSubscriptionImpl implements 
PageSubscription {
          }
       }
 
-      private int getNumberOfMessages() throws ActiveMQException {
+      private int getNumberOfMessages() {
          if (wasLive) {
             // if the page was live at any point, we need to
             // get the number of messages from the page-cache
@@ -1089,12 +1061,7 @@ final class PageSubscriptionImpl implements 
PageSubscription {
             List<PagePosition> positions = entry.getValue();
 
             for (PagePosition confirmed : positions) {
-               try {
-                  cursor.processACK(confirmed);
-               }
-               catch (ActiveMQException e) {
-                  getPagingStore().criticalException(e);
-               }
+               cursor.processACK(confirmed);
                cursor.deliveredCount.decrementAndGet();
             }
 
@@ -1165,21 +1132,15 @@ final class PageSubscriptionImpl implements 
PageSubscription {
             return currentDelivery;
          }
 
-         try {
-            if (position == null) {
-               position = getStartPosition();
-            }
-
-            currentDelivery = moveNext();
-            return currentDelivery;
-         }
-         catch (ActiveMQException e) {
-            getPagingStore().criticalException(e);
-            throw new IllegalStateException(e.getMessage(), e);
+         if (position == null) {
+            position = getStartPosition();
          }
+
+         currentDelivery = moveNext();
+         return currentDelivery;
       }
 
-      private PagedReference moveNext() throws ActiveMQException {
+      private PagedReference moveNext() {
          synchronized (PageSubscriptionImpl.this) {
             boolean match = false;
 
@@ -1309,14 +1270,9 @@ final class PageSubscriptionImpl implements 
PageSubscription {
          deliveredCount.incrementAndGet();
          PagedReference delivery = currentDelivery;
          if (delivery != null) {
-            try {
-               PageCursorInfo info = 
PageSubscriptionImpl.this.getPageInfo(currentDelivery.getPosition());
-               if (info != null) {
-                  info.remove(currentDelivery.getPosition());
-               }
-            }
-            catch (ActiveMQException e) {
-               getPagingStore().criticalException(e);
+            PageCursorInfo info = 
PageSubscriptionImpl.this.getPageInfo(currentDelivery.getPosition());
+            if (info != null) {
+               info.remove(currentDelivery.getPosition());
             }
          }
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a5a993ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 9136c17..1463b3c 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -176,12 +176,6 @@ public class PagingStoreImpl implements PagingStore {
 
    }
 
-   @Override
-   public void criticalException(Throwable e) {
-      ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
-      storeFactory.criticalException(e);
-   }
-
    /**
     * @param addressSettings
     */

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a5a993ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
index 95b30b2..0ff55ac 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.artemis.core.server;
 
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-
 /**
  * A reference to a message.
  *
@@ -27,7 +25,7 @@ public interface MessageReference {
 
    boolean isPaged();
 
-   ServerMessage getMessage() throws ActiveMQException;
+   ServerMessage getMessage();
 
    /**
     * We define this method aggregation here because on paging we need to hold 
the original estimate,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a5a993ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 9ea60cd..81bd565 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -239,7 +239,7 @@ public interface Queue extends Bindable {
     */
    void deliverScheduledMessages() throws ActiveMQException;
 
-   void postAcknowledge(MessageReference ref) throws ActiveMQException;
+   void postAcknowledge(MessageReference ref);
 
    float getRate();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a5a993ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index c6d5aee..5420688 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -21,7 +21,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
-import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.filter.Filter;
@@ -67,15 +66,7 @@ public class LastValueQueue extends QueueImpl {
 
    @Override
    public synchronized void addTail(final MessageReference ref, final boolean 
direct) {
-      SimpleString prop;
-
-      try {
-         prop = 
ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
-      }
-      catch (ActiveMQException e) {
-         criticalError(e);
-         throw new IllegalStateException(e);
-      }
+      SimpleString prop = 
ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
 
       if (prop != null) {
          HolderReference hr = map.get(prop);
@@ -112,59 +103,45 @@ public class LastValueQueue extends QueueImpl {
 
    @Override
    public synchronized void addHead(final MessageReference ref) {
-      try {
-         SimpleString prop = 
ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
+      SimpleString prop = 
ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
 
-         if (prop != null) {
-            HolderReference hr = map.get(prop);
+      if (prop != null) {
+         HolderReference hr = map.get(prop);
 
-            if (hr != null) {
-               // We keep the current ref and ack the one we are returning
+         if (hr != null) {
+            // We keep the current ref and ack the one we are returning
 
-               super.referenceHandled();
+            super.referenceHandled();
 
-               try {
-                  super.acknowledge(ref);
-               }
-               catch (Exception e) {
-                  ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);
-               }
+            try {
+               super.acknowledge(ref);
             }
-            else {
-               map.put(prop, (HolderReference) ref);
-
-               super.addHead(ref);
+            catch (Exception e) {
+               ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);
             }
          }
          else {
+            map.put(prop, (HolderReference) ref);
+
             super.addHead(ref);
          }
       }
-      catch (ActiveMQException e) {
-         criticalError(e);
-         throw new IllegalStateException(e);
+      else {
+         super.addHead(ref);
       }
    }
 
    @Override
    protected void refRemoved(MessageReference ref) {
-      try {
-
-         synchronized (this) {
-            SimpleString prop = 
ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
+      synchronized (this) {
+         SimpleString prop = 
ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
 
-            if (prop != null) {
-               map.remove(prop);
-            }
+         if (prop != null) {
+            map.remove(prop);
          }
-
-         super.refRemoved(ref);
-      }
-      catch (ActiveMQException e) {
-         criticalError(e);
-         throw new IllegalStateException(e);
       }
 
+      super.refRemoved(ref);
    }
 
    private class HolderReference implements MessageReference {
@@ -223,13 +200,7 @@ public class LastValueQueue extends QueueImpl {
 
       @Override
       public ServerMessage getMessage() {
-         try {
-            return ref.getMessage();
-         }
-         catch (ActiveMQException e) {
-            criticalError(e);
-            throw new IllegalStateException(e);
-         }
+         return ref.getMessage();
       }
 
       @Override
@@ -285,13 +256,7 @@ public class LastValueQueue extends QueueImpl {
        */
       @Override
       public int getMessageMemoryEstimate() {
-         try {
-            return ref.getMessage().getMemoryEstimate();
-         }
-         catch (ActiveMQException e) {
-            criticalError(e);
-            throw new IllegalStateException(e);
-         }
+         return ref.getMessage().getMemoryEstimate();
       }
 
       /* (non-Javadoc)

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a5a993ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index c963e4d..12b5231 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1054,13 +1054,7 @@ public class QueueImpl implements Queue {
 
    @Override
    public void cancel(final Transaction tx, final MessageReference reference, 
boolean ignoreRedeliveryCheck) {
-      try {
-         getRefsOperation(tx, ignoreRedeliveryCheck).addAck(reference);
-      }
-      catch (ActiveMQException e) {
-         criticalError(e);
-         getPageSubscription().getPagingStore().criticalException(e);
-      }
+      getRefsOperation(tx, ignoreRedeliveryCheck).addAck(reference);
    }
 
    @Override
@@ -1777,12 +1771,7 @@ public class QueueImpl implements Queue {
 
    private synchronized void internalAddTail(final MessageReference ref) {
       refAdded(ref);
-      try {
-         messageReferences.addTail(ref, ref.getMessage().getPriority());
-      }
-      catch (ActiveMQException e) {
-         criticalError(e);
-      }
+      messageReferences.addTail(ref, getPriority(ref));
    }
 
    /**
@@ -1793,20 +1782,24 @@ public class QueueImpl implements Queue {
     * @param ref
     */
    private void internalAddHead(final MessageReference ref) {
+      queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
+      refAdded(ref);
+
+      int priority = getPriority(ref);
+
+      messageReferences.addHead(ref, priority);
+   }
+
+   private int getPriority(MessageReference ref) {
       try {
-         queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
-         refAdded(ref);
-         messageReferences.addHead(ref, ref.getMessage().getPriority());
+         return ref.getMessage().getPriority();
       }
-      catch (ActiveMQException e) {
-         criticalError(e);
+      catch (Throwable e) {
+         ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+         return 4; // the default one in case of failure
       }
    }
 
-   void criticalError(ActiveMQException e) {
-      storageManager.criticalError(e);
-   }
-
    private synchronized void doInternalPoll() {
 
       int added = 0;
@@ -2036,9 +2029,9 @@ public class QueueImpl implements Queue {
             // But we don't use the groupID on internal queues (clustered 
queues) otherwise the group map would leak forever
             return 
ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
          }
-         catch (ActiveMQException e) {
-            criticalError(e);
-            throw new IllegalStateException(e);
+         catch (Throwable e) {
+            ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+            return null;
          }
       }
    }
@@ -2543,9 +2536,9 @@ public class QueueImpl implements Queue {
             return false;
          }
       }
-      catch (ActiveMQException e) {
-         criticalError(e);
-         throw new IllegalStateException(e);
+      catch (Throwable e) {
+         ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+         return false;
       }
    }
 
@@ -2584,7 +2577,7 @@ public class QueueImpl implements Queue {
    }
 
    @Override
-   public void postAcknowledge(final MessageReference ref) throws 
ActiveMQException {
+   public void postAcknowledge(final MessageReference ref) {
       QueueImpl queue = (QueueImpl) ref.getQueue();
 
       queue.decDelivering();
@@ -2594,9 +2587,17 @@ public class QueueImpl implements Queue {
          return;
       }
 
-      final ServerMessage message = ref.getMessage();
+      ServerMessage message;
+
+      try {
+         message = ref.getMessage();
+      }
+      catch (Throwable e) {
+         ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+         message = null;
+      }
 
-      boolean durableRef = message.isDurable() && queue.durable;
+      boolean durableRef = message != null && message.isDurable() && 
queue.durable;
 
       try {
          message.decrementRefCount();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a5a993ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
index 92d1a61..9b72f51 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
@@ -22,7 +22,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.MessageReference;
@@ -38,7 +37,7 @@ public class RefsOperation extends 
TransactionOperationAbstract {
    private Queue queue;
    List<MessageReference> refsToAck = new ArrayList<>();
 
-   List<ServerMessage> pagedMessagesToPostACK = null;
+   List<MessageReference> pagedMessagesToPostACK = null;
 
    /**
     * It will ignore redelivery check, which is used during consumer.close
@@ -56,13 +55,13 @@ public class RefsOperation extends 
TransactionOperationAbstract {
       ignoreRedeliveryCheck = true;
    }
 
-   synchronized void addAck(final MessageReference ref) throws 
ActiveMQException {
+   synchronized void addAck(final MessageReference ref) {
       refsToAck.add(ref);
       if (ref.isPaged()) {
          if (pagedMessagesToPostACK == null) {
             pagedMessagesToPostACK = new ArrayList<>();
          }
-         pagedMessagesToPostACK.add(ref.getMessage());
+         pagedMessagesToPostACK.add(ref);
       }
    }
 
@@ -148,32 +147,26 @@ public class RefsOperation extends 
TransactionOperationAbstract {
    public void afterCommit(final Transaction tx) {
       for (MessageReference ref : refsToAck) {
          synchronized (ref.getQueue()) {
-            try {
-               queue.postAcknowledge(ref);
-            }
-            catch (ActiveMQException e) {
-               if (queue instanceof QueueImpl) {
-                  ((QueueImpl) queue).criticalError(e);
-               }
-               else {
-                  ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
-               }
-            }
+            queue.postAcknowledge(ref);
          }
       }
 
       if (pagedMessagesToPostACK != null) {
-         for (ServerMessage msg : pagedMessagesToPostACK) {
-            try {
-               msg.decrementRefCount();
-            }
-            catch (Exception e) {
-               ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
-            }
+         for (MessageReference refmsg : pagedMessagesToPostACK) {
+            decrementRefCount(refmsg);
          }
       }
    }
 
+   private void decrementRefCount(MessageReference refmsg) {
+      try {
+         refmsg.getMessage().decrementRefCount();
+      }
+      catch (Exception e) {
+         ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+      }
+   }
+
    @Override
    public synchronized List<MessageReference> getRelatedMessageReferences() {
       List<MessageReference> listRet = new LinkedList<>();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a5a993ed/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/PagingOMETest.java
----------------------------------------------------------------------
diff --git 
a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/PagingOMETest.java
 
b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/PagingOMETest.java
new file mode 100644
index 0000000..aa3ef38
--- /dev/null
+++ 
b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/PagingOMETest.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.extras.byteman;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(BMUnitRunner.class)
+public class PagingOMETest extends ActiveMQTestBase {
+
+   private ServerLocator locator;
+   private ActiveMQServer server;
+   private ClientSessionFactory sf;
+   static final int MESSAGE_SIZE = 1024; // 1k
+
+   private static final int RECEIVE_TIMEOUT = 5000;
+
+   private static final int PAGE_MAX = 100 * 1024;
+
+   private static final int PAGE_SIZE = 10 * 1024;
+
+   static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
+
+   static boolean failureActive = false;
+
+   public static void refCheck() {
+      if (failureActive) {
+         throw new OutOfMemoryError("fake error");
+      }
+   }
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      failureActive = false;
+      locator = createInVMNonHALocator();
+   }
+
+   @Test
+   @BMRules(
+      rules = {@BMRule(
+         name = "fakeOME",
+         targetClass = 
"org.apache.activemq.artemis.core.paging.cursor.PagedReferenceImpl",
+         targetMethod = "getPagedMessage",
+         targetLocation = "ENTRY",
+         action = 
"org.apache.activemq.artemis.tests.extras.byteman.PagingOMETest.refCheck()")})
+   public void testPageCleanup() throws Exception {
+      clearDataRecreateServerDirs();
+
+      Configuration config = createDefaultConfig(false);
+
+      config.setJournalSyncNonTransactional(false);
+
+      HashMap<String, AddressSettings> map = new HashMap<>();
+      AddressSettings value = new AddressSettings();
+      map.put(ADDRESS.toString(), value);
+      server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
+
+      server.start();
+
+      final int numberOfMessages = 2;
+
+      locator = createInVMNonHALocator();
+
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setBlockOnAcknowledge(false);
+      locator.setConsumerWindowSize(0);
+
+      sf = createSessionFactory(locator);
+
+      ClientSession session = sf.createSession(false, false, false);
+
+      session.createQueue(ADDRESS, ADDRESS, null, true);
+
+      Queue queue = server.locateQueue(ADDRESS);
+      queue.getPageSubscription().getPagingStore().startPaging();
+
+      
Assert.assertTrue(queue.getPageSubscription().getPagingStore().isPaging());
+
+      ClientProducer producer = session.createProducer(PagingOMETest.ADDRESS);
+
+      ClientMessage message = null;
+
+      byte[] body = new byte[MESSAGE_SIZE];
+
+      ByteBuffer bb = ByteBuffer.wrap(body);
+
+      for (int j = 1; j <= MESSAGE_SIZE; j++) {
+         bb.put(getSamplebyte(j));
+      }
+
+      for (int i = 0; i < numberOfMessages; i++) {
+         message = session.createMessage(true);
+
+         ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+         bodyLocal.writeBytes(body);
+
+         producer.send(message);
+         if (i % 1000 == 0) {
+            session.commit();
+         }
+      }
+      session.commit();
+
+      session = sf.createSession(false, false, false);
+
+      session.start();
+
+      assertEquals(numberOfMessages, queue.getMessageCount());
+
+      // The consumer has to be created after the queue.getMessageCount 
assertion
+      // otherwise delivery could alter the messagecount and give us a false 
failure
+      ClientConsumer consumer = session.createConsumer(PagingOMETest.ADDRESS);
+      ClientMessage msg = null;
+
+      msg = consumer.receive(1000);
+
+      failureActive = true;
+      msg.individualAcknowledge();
+      try {
+         session.commit();
+         Assert.fail("exception expected");
+      }
+      catch (Exception expected) {
+      }
+      failureActive = false;
+      session.rollback();
+
+      session.close();
+
+      sf.close();
+
+      locator.close();
+
+      server.stop();
+
+      server.start();
+
+      locator = createInVMNonHALocator();
+
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setBlockOnAcknowledge(false);
+      locator.setConsumerWindowSize(0);
+
+      sf = createSessionFactory(locator);
+
+      session = sf.createSession(false, false, false);
+
+      consumer = session.createConsumer(PagingOMETest.ADDRESS);
+
+      session.start();
+
+      for (int i = 0; i < numberOfMessages; i++) {
+         msg = consumer.receive(1000);
+         Assert.assertNotNull(msg);
+         msg.individualAcknowledge();
+      }
+      Assert.assertNull(consumer.receiveImmediate());
+      session.commit();
+
+      session.close();
+      sf.close();
+      server.stop();
+
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a5a993ed/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
----------------------------------------------------------------------
diff --git 
a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
 
b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
index 6244330..6351357 100644
--- 
a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
+++ 
b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
@@ -257,10 +257,6 @@ public class PersistMultiThreadTest extends 
ActiveMQTestBase {
       }
 
       @Override
-      public void criticalException(Throwable e) {
-      }
-
-      @Override
       public int getNumberOfPages() {
          return 0;
       }

Reply via email to