Author: orudyy
Date: Thu Aug 28 09:43:27 2014
New Revision: 1621106
URL: http://svn.apache.org/r1621106
Log:
QPID-6051: Fix handling of exceptions thrown from post commit or deferred
actions on transaction commit
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1621106&r1=1621105&r2=1621106&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
(original)
+++
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
Thu Aug 28 09:43:27 2014
@@ -24,6 +24,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -279,9 +281,9 @@ public class LocalTransaction implements
public StoreFuture commitAsync(final Runnable deferred)
{
sync();
+ StoreFuture future = StoreFuture.IMMEDIATE_FUTURE;
try
{
- StoreFuture future = StoreFuture.IMMEDIATE_FUTURE;
if(_transaction != null)
{
future = new StoreFuture()
@@ -325,8 +327,7 @@ public class LocalTransaction implements
}
catch (RuntimeException e)
{
- doRollbackActions();
- throw e;
+ handleUnexpectedException(e);
}
finally
{
@@ -350,21 +351,40 @@ public class LocalTransaction implements
}
}
- return future;
}
catch (RuntimeException e)
{
try
{
- doRollbackActions();
+ handleUnexpectedException(e);
}
finally
{
resetDetails();
}
- throw e;
}
+ return future;
+ }
+ private void handleUnexpectedException(RuntimeException e)
+ {
+ if(e instanceof ConnectionScopedRuntimeException || e instanceof
TransportException)
+ {
+ throw e;
+ }
+ else
+ {
+ _logger.error("Unexpected exception on execution of post commit
deferred actions", e);
+ boolean continueOnError =
Boolean.getBoolean("qpid.broker.exceptionHandler.continue");
+ if (continueOnError)
+ {
+ throw e;
+ }
+ else
+ {
+ Runtime.getRuntime().halt(1);
+ }
+ }
}
private void doPostTransactionActions()
Modified:
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1621106&r1=1621105&r2=1621106&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
(original)
+++
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
Thu Aug 28 09:43:27 2014
@@ -1109,10 +1109,16 @@ public class AMQChannel<T extends AMQPro
@Override
public void run()
{
- immediateAction.run();
- _txnCommits.incrementAndGet();
- _txnStarts.incrementAndGet();
- decrementOutstandingTxnsIfNecessary();
+ try
+ {
+ immediateAction.run();
+ }
+ finally
+ {
+ _txnCommits.incrementAndGet();
+ _txnStarts.incrementAndGet();
+ decrementOutstandingTxnsIfNecessary();
+ }
}
});
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]