Hiram, there is a regression in
org.apache.activemq.store.LevelDBStorePerDestinationTest
seems nothing is going to terminate the connection in this case.
Skipped the test as it was hanging the ci builds.
see: https://git-wip-us.apache.org/repos/asf?p=activemq.git;a=commit;h=b9f0783a

On 25 November 2013 18:22,  <chir...@apache.org> wrote:
> Updated Branches:
>   refs/heads/trunk 00cb9a566 -> b0e91d47f
>
>
> Have the leveldb store thorw SuppressReplyExceptions instead of IOExceptions 
> so that the clients retry try the operations instead of giving up.  Also 
> retry the problemantic getMessage() call which seems to fail at times.
>
> Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
> Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b0e91d47
> Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b0e91d47
> Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b0e91d47
>
> Branch: refs/heads/trunk
> Commit: b0e91d47f5fced59c89a34d993f4d87c7986b04b
> Parents: 00cb9a5
> Author: Hiram Chirino <hi...@hiramchirino.com>
> Authored: Mon Nov 25 13:17:58 2013 -0500
> Committer: Hiram Chirino <hi...@hiramchirino.com>
> Committed: Mon Nov 25 13:17:58 2013 -0500
>
> ----------------------------------------------------------------------
>  .../activemq/broker/SuppressReplyException.java |  8 +++++++
>  .../org/apache/activemq/leveldb/DBManager.scala |  2 +-
>  .../apache/activemq/leveldb/LevelDBClient.scala | 23 +++++++++++++++++---
>  .../apache/activemq/leveldb/LevelDBStore.scala  |  6 ++---
>  4 files changed, 32 insertions(+), 7 deletions(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/b0e91d47/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java
> ----------------------------------------------------------------------
> diff --git 
> a/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java
>  
> b/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java
> index eb54a12..f2c6502 100644
> --- 
> a/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java
> +++ 
> b/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java
> @@ -26,6 +26,14 @@ import java.io.IOException;
>   *
>   */
>  public class SuppressReplyException extends RuntimeException {
> +    public SuppressReplyException(Throwable cause) {
> +        super(cause);
> +    }
> +
> +    public SuppressReplyException(String reason) {
> +        super(reason);
> +    }
> +
>      public SuppressReplyException(String reason, IOException cause) {
>          super(reason, cause);
>      }
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/b0e91d47/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
> ----------------------------------------------------------------------
> diff --git 
> a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
>  
> b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
> index e467379..00260d9 100644
> --- 
> a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
> +++ 
> b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
> @@ -860,7 +860,7 @@ class DBManager(val parent:LevelDBStore) {
>    def getMessage(x: MessageId):Message = {
>      val id = 
> Option(pendingStores.get(x)).flatMap(_.headOption).map(_.id).getOrElse(x)
>      val locator = id.getDataLocator()
> -    val msg = client.getMessage(locator)
> +    val msg = client.getMessageWithRetry(locator)
>      msg.setMessageId(id)
>      msg
>    }
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/b0e91d47/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
> ----------------------------------------------------------------------
> diff --git 
> a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
>  
> b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
> index 15f7bb0..c0cedce 100755
> --- 
> a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
> +++ 
> b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
> @@ -52,6 +52,7 @@ import org.apache.activemq.leveldb.MessageRecord
>  import org.apache.activemq.leveldb.EntryLocator
>  import org.apache.activemq.leveldb.DataLocator
>  import org.fusesource.hawtbuf.ByteArrayOutputStream
> +import org.apache.activemq.broker.SuppressReplyException
>
>  /**
>   * @author <a href="http://hiramchirino.com";>Hiram Chirino</a>
> @@ -545,7 +546,7 @@ class LevelDBClient(store: LevelDBStore) {
>            Thread.sleep(100);
>          }
>        }
> -      throw failure;
> +      throw new SuppressReplyException(failure);
>      }
>      try {
>        func
> @@ -1244,7 +1245,7 @@ class LevelDBClient(store: LevelDBStore) {
>      collectionCursor(collectionKey, encodeLong(seq)) { (key, value) =>
>        val seq = decodeLong(key)
>        var locator = DataLocator(store, value.getValueLocation, 
> value.getValueLength)
> -      val msg = getMessage(locator)
> +      val msg = getMessageWithRetry(locator)
>        msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq))
>        msg.getMessageId().setDataLocator(locator)
>        msg.setRedeliveryCounter(decodeQueueEntryMeta(value))
> @@ -1270,7 +1271,7 @@ class LevelDBClient(store: LevelDBStore) {
>          func(XaAckRecord(collectionKey, seq, ack, sub))
>        } else {
>          var locator = DataLocator(store, value.getValueLocation, 
> value.getValueLength)
> -        val msg = getMessage(locator)
> +        val msg = getMessageWithRetry(locator)
>          msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq))
>          msg.getMessageId().setDataLocator(locator)
>          func(msg)
> @@ -1287,6 +1288,22 @@ class LevelDBClient(store: LevelDBStore) {
>      }
>    }
>
> +  def getMessageWithRetry(locator:AnyRef):Message = {
> +    var retry = 0
> +    var rc = getMessage(locator);
> +    while( rc == null ) {
> +      if( retry > 10 )
> +        return null;
> +      Thread.sleep(retry*10)
> +      rc = getMessage(locator);
> +      retry+=1
> +    }
> +    if( retry > 0 ) {
> +      info("Recovered from 'failed getMessage' on retry: "+retry)
> +    }
> +    rc
> +  }
> +
>    def getMessage(locator:AnyRef):Message = {
>      assert(locator!=null)
>      val buffer = locator match {
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/b0e91d47/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
> ----------------------------------------------------------------------
> diff --git 
> a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
>  
> b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
> index 322656f..e4c7a02 100644
> --- 
> a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
> +++ 
> b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
> @@ -17,7 +17,7 @@
>
>  package org.apache.activemq.leveldb
>
> -import org.apache.activemq.broker.{LockableServiceSupport, 
> BrokerServiceAware, ConnectionContext}
> +import org.apache.activemq.broker.{SuppressReplyException, 
> LockableServiceSupport, BrokerServiceAware, ConnectionContext}
>  import org.apache.activemq.command._
>  import org.apache.activemq.openwire.OpenWireFormat
>  import org.apache.activemq.usage.SystemUsage
> @@ -186,7 +186,7 @@ class LevelDBStore extends LockableServiceSupport with 
> BrokerServiceAware with P
>
>    def check_running = {
>      if( this.isStopped ) {
> -      throw new IOException("Store has been stopped")
> +      throw new SuppressReplyException("Store has been stopped")
>      }
>    }
>
> @@ -437,7 +437,7 @@ class LevelDBStore extends LockableServiceSupport with 
> BrokerServiceAware with P
>    def verify_running = {
>      if( isStopping || isStopped ) {
>        try {
> -        throw new IOException("Not running")
> +        throw new SuppressReplyException("Not running")
>        } catch {
>          case e:IOException =>
>            if( broker_service!=null ) {
>



-- 
http://redhat.com
http://blog.garytully.com

Reply via email to