Author: jstrachan
Date: Fri Jul 4 05:55:06 2008
New Revision: 674036
URL: http://svn.apache.org/viewvc?rev=674036&view=rev
Log:
transaction error handler for
https://issues.apache.org/activemq/browse/CAMEL-663
Added:
activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerBuilder.java
(with props)
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionInterceptor.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=674036&r1=674035&r2=674036&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
Fri Jul 4 05:55:06 2008
@@ -146,8 +146,7 @@
if (data.redeliveryCounter > 0) {
// Figure out how long we should wait to resend this message.
- data.redeliveryDelay =
data.currentRedeliveryPolicy.getRedeliveryDelay(data.redeliveryDelay);
- sleep(data.redeliveryDelay);
+ data.redeliveryDelay =
data.currentRedeliveryPolicy.sleep(data.redeliveryDelay);
}
exchange.setProperty(EXCEPTION_CAUSE_PROPERTY,
exchange.getException());
@@ -265,21 +264,6 @@
return next;
}
- protected void sleep(long redeliveryDelay) {
- if (redeliveryDelay > 0) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Sleeping for: " + redeliveryDelay + " millis until
attempting redelivery");
- }
- try {
- Thread.sleep(redeliveryDelay);
- } catch (InterruptedException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Thread interrupted: " + e, e);
- }
- }
- }
- }
-
@Override
protected void doStart() throws Exception {
ServiceHelper.startServices(output, deadLetter);
Added:
activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerBuilder.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerBuilder.java?rev=674036&view=auto
==============================================================================
---
activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerBuilder.java
(added)
+++
activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerBuilder.java
Fri Jul 4 05:55:06 2008
@@ -0,0 +1,76 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.spi;
+
+import org.apache.camel.builder.ErrorHandlerBuilderSupport;
+import org.apache.camel.builder.ErrorHandlerBuilder;
+import org.apache.camel.Processor;
+import org.apache.camel.processor.RedeliveryPolicy;
+import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.ObjectHelper;
+import org.springframework.transaction.support.TransactionTemplate;
+import org.springframework.beans.factory.InitializingBean;
+
+/**
+ * An error handler which will roll the exception back if there is an error
+ * rather than using the dead letter channel and retry logic.
+ *
+ * A delay is also used after a rollback
+ *
+ * @version $Revision: 1.1 $
+ */
+public class TransactionErrorHandlerBuilder extends ErrorHandlerBuilderSupport
implements Cloneable, InitializingBean {
+
+ private TransactionTemplate transactionTemplate;
+ private RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+
+ public TransactionErrorHandlerBuilder() {
+ }
+
+ public TransactionTemplate getTransactionTemplate() {
+ return transactionTemplate;
+ }
+
+ public void setTransactionTemplate(TransactionTemplate
transactionTemplate) {
+ this.transactionTemplate = transactionTemplate;
+ }
+
+ public RedeliveryPolicy getRedeliveryPolicy() {
+ return redeliveryPolicy;
+ }
+
+ public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
+ this.redeliveryPolicy = redeliveryPolicy;
+ }
+
+ public ErrorHandlerBuilder copy() {
+ try {
+ return (ErrorHandlerBuilder) clone();
+ } catch (CloneNotSupportedException e) {
+ throw new Error("Clone should be supported: " + e, e);
+ }
+ }
+
+ public Processor createErrorHandler(RouteContext routeContext, Processor
processor) throws Exception {
+ return new TransactionInterceptor(processor, transactionTemplate,
redeliveryPolicy);
+ }
+
+ public void afterPropertiesSet() throws Exception {
+ ObjectHelper.notNull(transactionTemplate, "transactionTemplate");
+ }
+}
Propchange:
activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerBuilder.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionInterceptor.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionInterceptor.java?rev=674036&r1=674035&r2=674036&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionInterceptor.java
(original)
+++
activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionInterceptor.java
Fri Jul 4 05:55:06 2008
@@ -19,7 +19,9 @@
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeProperty;
import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.Processor;
import org.apache.camel.processor.DelegateProcessor;
+import org.apache.camel.processor.RedeliveryPolicy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.transaction.TransactionDefinition;
@@ -40,16 +42,47 @@
new ExchangeProperty<Boolean>("transacted",
"org.apache.camel.transacted", Boolean.class);
private static final transient Log LOG =
LogFactory.getLog(TransactionInterceptor.class);
private final TransactionTemplate transactionTemplate;
+ private ThreadLocal<RedeliveryData> previousRollback = new
ThreadLocal<RedeliveryData>() {
+ @Override
+ protected RedeliveryData initialValue() {
+ return new RedeliveryData();
+ }
+ };
+ private RedeliveryPolicy redeliveryPolicy;
public TransactionInterceptor(TransactionTemplate transactionTemplate) {
this.transactionTemplate = transactionTemplate;
}
+ public TransactionInterceptor(Processor processor, TransactionTemplate
transactionTemplate) {
+ super(processor);
+ this.transactionTemplate = transactionTemplate;
+ }
+
+ public TransactionInterceptor(Processor processor, TransactionTemplate
transactionTemplate, RedeliveryPolicy redeliveryPolicy) {
+ this(processor, transactionTemplate);
+ this.redeliveryPolicy = redeliveryPolicy;
+ }
+
+ @Override
+ public String toString() {
+ return "TransactionInterceptor:"
+ +
propagationBehaviorToString(transactionTemplate.getPropagationBehavior())
+ + "[" + getProcessor() + "]";
+ }
+
public void process(final Exchange exchange) {
LOG.debug("Transaction begin");
+ final RedeliveryData redeliveryData = previousRollback.get();
+
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
protected void doInTransactionWithoutResult(TransactionStatus
status) {
+ if (redeliveryPolicy != null &&
redeliveryData.previousRollback) {
+ // lets delay
+ redeliveryData.redeliveryDelay =
redeliveryPolicy.sleep(redeliveryData.redeliveryDelay);
+ }
+
// wrapper exception to throw if the exchange failed
// IMPORTANT: Must be a runtime exception to let Spring regard
it as to do "rollback"
RuntimeCamelException rce = null;
@@ -92,6 +125,7 @@
// rehrow exception if the exchange failed
if (rce != null) {
+ redeliveryData.previousRollback = true;
if (activeTx) {
status.setRollbackOnly();
LOG.debug("Transaction rollback");
@@ -101,17 +135,27 @@
}
});
+ redeliveryData.previousRollback = false;
+ redeliveryData.redeliveryDelay = 0L;
+
LOG.debug("Transaction commit");
}
- @Override
- public String toString() {
- return "TransactionInterceptor:"
- +
propagationBehaviorToString(transactionTemplate.getPropagationBehavior())
- + "[" + getProcessor() + "]";
+
+ public RedeliveryPolicy getRedeliveryPolicy() {
+ return redeliveryPolicy;
+ }
+
+ public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
+ this.redeliveryPolicy = redeliveryPolicy;
+ }
+
+ protected static class RedeliveryData {
+ boolean previousRollback;
+ long redeliveryDelay;
}
- private String propagationBehaviorToString(int propagationBehavior) {
+ protected String propagationBehaviorToString(int propagationBehavior) {
String rc;
switch (propagationBehavior) {
case TransactionDefinition.PROPAGATION_MANDATORY: