Author: orudyy
Date: Thu Aug 28 09:43:27 2014
New Revision: 1621106

URL: http://svn.apache.org/r1621106
Log:
QPID-6051: Fix handling of exceptions thrown from post commit or deferred 
actions on transaction commit

Modified:
    
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
    
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java

Modified: 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1621106&r1=1621105&r2=1621106&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
 Thu Aug 28 09:43:27 2014
@@ -24,6 +24,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.transport.TransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -279,9 +281,9 @@ public class LocalTransaction implements
     public StoreFuture commitAsync(final Runnable deferred)
     {
         sync();
+        StoreFuture future = StoreFuture.IMMEDIATE_FUTURE;
         try
         {
-            StoreFuture future = StoreFuture.IMMEDIATE_FUTURE;
             if(_transaction != null)
             {
                 future = new StoreFuture()
@@ -325,8 +327,7 @@ public class LocalTransaction implements
                                 }
                                 catch (RuntimeException e)
                                 {
-                                    doRollbackActions();
-                                    throw e;
+                                    handleUnexpectedException(e);
                                 }
                                 finally
                                 {
@@ -350,21 +351,40 @@ public class LocalTransaction implements
                 }
             }
 
-            return future;
         }
         catch (RuntimeException e)
         {
             try
             {
-                doRollbackActions();
+                handleUnexpectedException(e);
             }
             finally
             {
                 resetDetails();
             }
-            throw e;
         }
+        return future;
+    }
 
+    private void handleUnexpectedException(RuntimeException e)
+    {
+        if(e instanceof ConnectionScopedRuntimeException || e instanceof 
TransportException)
+        {
+            throw e;
+        }
+        else
+        {
+            _logger.error("Unexpected exception on execution of post commit 
deferred actions", e);
+            boolean continueOnError = 
Boolean.getBoolean("qpid.broker.exceptionHandler.continue");
+            if (continueOnError)
+            {
+                throw e;
+            }
+            else
+            {
+                Runtime.getRuntime().halt(1);
+            }
+        }
     }
 
     private void doPostTransactionActions()

Modified: 
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1621106&r1=1621105&r2=1621106&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
 Thu Aug 28 09:43:27 2014
@@ -1109,10 +1109,16 @@ public class AMQChannel<T extends AMQPro
                 @Override
                 public void run()
                 {
-                    immediateAction.run();
-                    _txnCommits.incrementAndGet();
-                    _txnStarts.incrementAndGet();
-                    decrementOutstandingTxnsIfNecessary();
+                    try
+                    {
+                        immediateAction.run();
+                    }
+                    finally
+                    {
+                        _txnCommits.incrementAndGet();
+                        _txnStarts.incrementAndGet();
+                        decrementOutstandingTxnsIfNecessary();
+                    }
                 }
             });
         }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to