Author: davsclaus
Date: Mon Jun 30 23:32:47 2008
New Revision: 673008
URL: http://svn.apache.org/viewvc?rev=673008&view=rev
Log:
CAMEL-634: Applied patch with thanks to Marat. Added unit test for
transactional DataSource.
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/ExchangeProperty.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ExchangePropertyTest.java
activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/BookService.java
activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceTest.java
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/interceptor/transactionalClientDataSource.xml
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/ConditionalExceptionProcessor.java
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/QueueToQueueRequestReplyTransactionTest.java
activemq/camel/trunk/components/camel-spring/pom.xml
activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionInterceptor.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=673008&r1=673007&r2=673008&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
Mon Jun 30 23:32:47 2008
@@ -170,6 +170,11 @@
boolean isFailed();
/**
+ * Returns true if this exchange is transacted
+ */
+ boolean isTransacted();
+
+ /**
* Returns the container so that a processor can resolve endpoints from
URIs
*
* @return the container which owns this exchange
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/ExchangeProperty.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/ExchangeProperty.java?rev=673008&view=auto
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/ExchangeProperty.java
(added)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/ExchangeProperty.java
Mon Jun 30 23:32:47 2008
@@ -0,0 +1,135 @@
+package org.apache.camel;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Represents an instance and a type safe registry of well known Camel
Exchange properties.
+ * <p/>
+ * <b>Usage pattern:</b>
+ * <br/>In your code register a property that you wish to pass via Camel
Exchange:
+ * <pre>
+ * public static final ExchangeProperty<Boolean> myProperty =
+ * new ExchangeProperty<Boolean>("myProperty",
"org.apache.myproject.mypackage.myproperty", Boolean.class);
+ *
+ * Then in your code set this property's value:
+ * myProperty.set(exchange, Boolean.TRUE);
+ *
+ * Check the value of this property where required:
+ * ExchangeProperty<?> property = ExchangeProperty.get("myProperty");
+ * if (property != null && property.get(exchange) == Boolean.TRUE) {
+ * // do your thing ...
+ * }
+ * Or
+ * Boolean value = myProperty.get(exchange);
+ * if (value == Boolean.TRUE) {
+ * // do your thing
+ * }
+ *
+ * When your code no longer requires this property then deregister it:
+ * ExchangeProperty.deregister(myProperty);
+ * Or
+ * ExchangeProperty.deregister("myProperty");
+ * </pre>
+ *
+ * <b>Note:</b> that if ExchangeProperty instance get or set methods are used
then type checks
+ * of property's value are performed and a runtime exception can be thrown if
type
+ * safety is violated.
+ */
+public class ExchangeProperty<T> {
+ private final String literal;
+ private final String name;
+ private final Class<T> type;
+
+ private static final List<ExchangeProperty<?>> values =
+ new ArrayList<ExchangeProperty<?>>();
+
+ private static final Map<String, ExchangeProperty<?>> literalMap =
+ new HashMap<String, ExchangeProperty<?>>();
+
+ private static final Map<String, ExchangeProperty<?>> nameMap =
+ new HashMap<String, ExchangeProperty<?>>();
+
+ public ExchangeProperty(String literal, String name, Class<T> type) {
+ this.literal = literal;
+ this.name = name;
+ this.type = type;
+ register(this);
+ }
+
+ public String literal() {
+ return literal;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public Class<T> type() {
+ return type;
+ }
+
+ public T get(Exchange exchange) {
+ return exchange.getProperty(name, type);
+ }
+
+ public static ExchangeProperty<?> get(String literal) {
+ return literalMap.get(literal);
+ }
+
+ public static ExchangeProperty<?> getByName(String name) {
+ return nameMap.get(name);
+ }
+
+ public T set(Exchange exchange, T value) {
+ T oldValue = get(exchange);
+ exchange.setProperty(name, value);
+ return oldValue;
+ }
+
+ public T remove(Exchange exchange) {
+ T oldValue = get(exchange);
+ exchange.removeProperty(name);
+ return oldValue;
+ }
+
+ @Override
+ public String toString() {
+ return type().getCanonicalName() + " " + name + " (" + literal() + ")";
+ }
+
+ public static synchronized void register(ExchangeProperty<?> property) {
+ ExchangeProperty<?> existingProperty =
literalMap.get(property.literal());
+ if (existingProperty != null && existingProperty != property) {
+ throw new RuntimeCamelException("An Exchange Property '" +
property.literal()
+ + "' has already been registered; its traits are: " +
existingProperty.toString());
+ }
+ values.add(property);
+ literalMap.put(property.literal(), property);
+ nameMap.put(property.name(), property);
+ }
+
+ public static synchronized void deregister(ExchangeProperty<?> property) {
+ if (property != null) {
+ values.remove(property);
+ literalMap.remove(property.literal());
+ nameMap.put(property.name(), property);
+ }
+ }
+
+ public static synchronized void deregister(String literal) {
+ ExchangeProperty<?> property = literalMap.get(literal);
+ if (property != null) {
+ values.remove(property);
+ literalMap.remove(property.literal());
+ nameMap.put(property.name(), property);
+ }
+ }
+
+ public static synchronized ExchangeProperty<?>[] values() {
+ return values.toArray(new ExchangeProperty[0]);
+ }
+
+}
\ No newline at end of file
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java?rev=673008&r1=673007&r2=673008&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
Mon Jun 30 23:32:47 2008
@@ -22,6 +22,7 @@
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
+import org.apache.camel.ExchangeProperty;
import org.apache.camel.Message;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.spi.UnitOfWork;
@@ -33,6 +34,7 @@
* @version $Revision$
*/
public class DefaultExchange implements Exchange {
+
private static final UuidGenerator DEFAULT_ID_GENERATOR = new
UuidGenerator();
protected final CamelContext context;
private Map<String, Object> properties;
@@ -133,13 +135,39 @@
public <T> T getProperty(String name, Class<T> type) {
Object value = getProperty(name);
+
+ // if the property is also a well known property in ExchangeProperty
then validate that the
+ // value is of the same type
+ ExchangeProperty<?> property = ExchangeProperty.getByName(name);
+ if (property != null) {
+ validateExchangePropertyIsExpectedType(property, type, value);
+ }
+
return getContext().getTypeConverter().convertTo(type, value);
}
public void setProperty(String name, Object value) {
+ ExchangeProperty<?> property = ExchangeProperty.getByName(name);
+
+ // if the property is also a well known property in ExchangeProperty
then validate that the
+ // value is of the same type
+ if (property != null) {
+ Class type = value.getClass();
+ validateExchangePropertyIsExpectedType(property, type, value);
+ }
+
getProperties().put(name, value);
}
+ private <T> void
validateExchangePropertyIsExpectedType(ExchangeProperty<?> property, Class<T>
type, Object value) {
+ if (value != null && property != null &&
!property.type().isAssignableFrom(type)) {
+ throw new RuntimeCamelException("Type cast exception while getting
an "
+ + "Exchange Property value '" + value.toString()
+ + "' on Exchange " + this
+ + " for a well known Exchange Property with these traits:
" + property);
+ }
+ }
+
public Object removeProperty(String name) {
return getProperties().remove(name);
}
@@ -253,6 +281,11 @@
return getException() != null;
}
+ public boolean isTransacted() {
+ ExchangeProperty<?> property = ExchangeProperty.get("transacted");
+ return property != null && property.get(this) == Boolean.TRUE;
+ }
+
public UnitOfWork getUnitOfWork() {
return unitOfWork;
}
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=673008&r1=673007&r2=673008&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
Mon Jun 30 23:32:47 2008
@@ -21,6 +21,7 @@
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeProperty;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
@@ -93,7 +94,6 @@
public boolean process(final Exchange exchange, final AsyncCallback
callback, final RedeliveryData data) {
while (true) {
-
// We can't keep retrying if the route is being shutdown.
if (!isRunAllowed()) {
if (exchange.getException() == null) {
@@ -103,6 +103,8 @@
return data.sync;
}
+ resetMaxDeliveryIfTransacted(exchange);
+
if (exchange.getException() != null) {
Throwable e = exchange.getException();
exchange.setException(null); // Reset it since we are handling
it.
@@ -143,6 +145,7 @@
exchange.setProperty(EXCEPTION_CAUSE_PROPERTY,
exchange.getException());
exchange.setException(null);
+
boolean sync = outputAsync.process(exchange, new AsyncCallback() {
public void done(boolean sync) {
// Only handle the async case...
@@ -171,6 +174,12 @@
}
+ public void resetMaxDeliveryIfTransacted(Exchange exchange) {
+ if (exchange.isTransacted()) {
+ redeliveryPolicy.setMaximumRedeliveries(1);
+ }
+ }
+
public static boolean isFailureHandled(Exchange exchange) {
return exchange.getProperty(FAILURE_HANDLED_PROPERTY) != null;
}
Added:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ExchangePropertyTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ExchangePropertyTest.java?rev=673008&view=auto
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ExchangePropertyTest.java
(added)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ExchangePropertyTest.java
Mon Jun 30 23:32:47 2008
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+import org.apache.camel.impl.DefaultExchange;
+
+public class ExchangePropertyTest extends ExchangeTestSupport {
+ protected static final String P1_NAME =
"org.apache.myproject.mypackage.myproperty1";
+ protected static final String P2_NAME =
"org.apache.myproject.mypackage.myproperty2";
+
+ protected Exchange exchange;
+
+ public void testExchangePropertyRegistry() throws Exception {
+ ExchangeProperty<Boolean> myProperty1 =
+ new ExchangeProperty<Boolean>("myProperty1", P1_NAME,
Boolean.class);
+
+ assertEquals(ExchangeProperty.get("myProperty1"), myProperty1);
+ assertEquals(ExchangeProperty.values().length, 1);
+ assertEquals(ExchangeProperty.values()[0], myProperty1);
+
+ ExchangeProperty<Boolean> myProperty2 =
+ new ExchangeProperty<Boolean>("myProperty2", P2_NAME,
Boolean.class);
+
+ assertEquals(ExchangeProperty.get("myProperty2"), myProperty2);
+ assertEquals(ExchangeProperty.values().length, 2);
+ assertEquals(ExchangeProperty.values()[1], myProperty2);
+
+ try {
+ ExchangeProperty<Boolean> rejectedProperty =
+ new ExchangeProperty<Boolean>("myProperty2", P2_NAME,
Boolean.class);
+ fail("Expected RuntimeCamelException to be thrown due to duplicate
property "
+ + " registration attempt");
+ } catch (RuntimeCamelException e) {
+ assertEquals(ExchangeProperty.values().length, 2);
+ } catch (Throwable t) {
+ fail("Expected RuntimeCamelException to be thrown due to duplicate
propery "
+ + " registration attempt");
+ }
+ ExchangeProperty.deregister(myProperty1);
+ assertEquals(ExchangeProperty.get("myProperty1"), null);
+ ExchangeProperty.deregister("myProperty2");
+ assertEquals(ExchangeProperty.get("myProperty2"), null);
+ assertEquals(ExchangeProperty.values().length, 0);
+ }
+
+ public void testExchangePropertySetterGetter() throws Exception {
+ Exchange exchange = createExchange();
+
+ ExchangeProperty<Boolean> myProperty1 =
+ new ExchangeProperty<Boolean>("myProperty1", P1_NAME,
Boolean.class);
+
+ ExchangeProperty<String> myProperty2 =
+ new ExchangeProperty<String>("myProperty2", P2_NAME, String.class);
+
+ myProperty1.set(exchange, Boolean.TRUE);
+ assertTrue("Unexpected property value",
+ myProperty1.get(exchange) == Boolean.TRUE);
+ assertTrue("Unexpected property value",
+ ExchangeProperty.get("myProperty1").get(exchange) ==
Boolean.TRUE);
+
+ myProperty2.set(exchange, "camel");
+ assertTrue("Unexpected property value",
+ myProperty2.get(exchange).equals("camel"));
+ assertTrue("Unexpected property value",
+
ExchangeProperty.get("myProperty2").get(exchange).equals("camel"));
+
+ ExchangeProperty.deregister(myProperty1);
+ assertEquals(ExchangeProperty.get("myProperty1"), null);
+ ExchangeProperty.deregister("myProperty2");
+ assertEquals(ExchangeProperty.get("myProperty2"), null);
+ assertEquals(ExchangeProperty.values().length, 0);
+ }
+
+ public void testExchangePropertyTypeSafety() throws Exception {
+ Exchange exchange = createExchange();
+ ExchangeProperty<Boolean> myProperty1 =
+ new ExchangeProperty<Boolean>("myProperty1", P1_NAME,
Boolean.class);
+ try {
+ exchange.setProperty(P1_NAME, "camel");
+ fail("Expected RuntimeCamelException to be thrown due to property
value type cast violation");
+ } catch (RuntimeCamelException e) {
+ // complete
+ } catch (Throwable t) {
+ fail("Expected RuntimeCamelException to be thrown due to property
value type cast violation");
+ }
+
+ myProperty1.set(exchange, Boolean.TRUE);
+
+ assertTrue("Unexpected property value",
+ myProperty1.get(exchange) == Boolean.TRUE);
+ assertTrue("Unexpected property value",
+ ExchangeProperty.get("myProperty1").get(exchange) ==
Boolean.TRUE);
+
+ ExchangeProperty.deregister(myProperty1);
+ assertEquals(ExchangeProperty.get("myProperty1"), null);
+ assertEquals(ExchangeProperty.values().length, 0);
+ }
+}
Modified:
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java?rev=673008&r1=673007&r2=673008&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
(original)
+++
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
Mon Jun 30 23:32:47 2008
@@ -55,9 +55,10 @@
}
public void onMessage(final Message message) {
+ RuntimeCamelException rce = null;
try {
if (LOG.isDebugEnabled()) {
- LOG.debug(endpoint + " receiving JMS message: " + message);
+ LOG.debug(endpoint + " consumer receiving JMS message: " +
message);
}
Destination replyDestination = getReplyToDestination(message);
final JmsExchange exchange = createExchange(message,
replyDestination);
@@ -65,13 +66,20 @@
exchange.getIn().getHeaders();
}
processor.process(exchange);
-
final JmsMessage out = exchange.getOut(false);
- if (out != null && !disableReplyTo) {
+ if (exchange.getException() != null) {
+ rce = new RuntimeCamelException(exchange.getException());
+ }
+ if (rce == null && out != null && !disableReplyTo) {
sendReply(replyDestination, message, exchange, out);
}
} catch (Exception e) {
- throw new RuntimeCamelException(e);
+ rce = new RuntimeCamelException(e);
+ }
+ if (rce != null) {
+ LOG.warn(endpoint + " consumer caught an exception while
processing "
+ + "JMS message: " + message, rce);
+ throw rce;
}
}
Modified:
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/ConditionalExceptionProcessor.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/ConditionalExceptionProcessor.java?rev=673008&r1=673007&r2=673008&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/ConditionalExceptionProcessor.java
(original)
+++
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/ConditionalExceptionProcessor.java
Mon Jun 30 23:32:47 2008
@@ -30,23 +30,31 @@
private static final transient Log LOG =
LogFactory.getLog(ConditionalExceptionProcessor.class);
private int count;
+ private int maxCalls;
+ private String errorMsg;
public ConditionalExceptionProcessor() {
+ this.maxCalls = 1;
+ }
+
+ public ConditionalExceptionProcessor(int maxCalls) {
+ this.maxCalls = maxCalls;
}
public void process(Exchange exchange) throws Exception {
setCount(getCount() + 1);
- AbstractTransactionTest.assertTrue("Expected only 2 calls to process()
but encountered "
- + getCount() + ". There should be 1 for intentionally triggered
rollback, and 1 for redelivery.",
- getCount() <= 2);
-
+ if (getCount() > maxCalls * 2) {
+ errorMsg = "Expected only " + maxCalls * 2 + " calls to process()
but encountered "
+ + getCount() + ". There should be 1 for intentionally triggered
rollback, and 1 for redelivery for each call.";
+ }
+
// should be printed 2 times due to one re-delivery after one failure
- LOG.info("Exchange[" + getCount() + "][" + ((getCount() <= 1) ?
"Should rollback" : "Should succeed")
+ LOG.info("Exchange[" + getCount() + "][" + ((getCount() % 2 != 0) ?
"Should rollback" : "Should succeed")
+ "] = " + exchange);
- // force rollback on the second attempt
- if (getCount() <= 1) {
+ // force rollback on every mod 2 attempt
+ if (getCount() % 2 != 0) {
throw new Exception("Rollback should be intentionally triggered:
count[" + getCount() + "].");
}
}
@@ -58,4 +66,8 @@
public int getCount() {
return count;
}
+
+ public String getErrorMessage() {
+ return errorMsg;
+ }
}
Modified:
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/QueueToQueueRequestReplyTransactionTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/QueueToQueueRequestReplyTransactionTest.java?rev=673008&r1=673007&r2=673008&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/QueueToQueueRequestReplyTransactionTest.java
(original)
+++
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/QueueToQueueRequestReplyTransactionTest.java
Mon Jun 30 23:32:47 2008
@@ -44,14 +44,13 @@
public void
testRollbackUsingXmlQueueToQueueRequestReplyUsingDynamicMessageSelector()
throws Exception {
JmsComponent c = (JmsComponent)context.getComponent("activemq");
- // c.getConfiguration().setRequestTimeout(600000);
JmsComponent c1 = (JmsComponent)context.getComponent("activemq-1");
-
+ final ConditionalExceptionProcessor cp = new
ConditionalExceptionProcessor(10);
context.addRoutes(new SpringRouteBuilder() {
@Override
public void configure() throws Exception {
Policy required = bean(SpringTransactionPolicy.class,
"PROPAGATION_REQUIRED_POLICY");
-
from("activemq:queue:foo?replyTo=queue:foo.reply").policy(required).process(new
ConditionalExceptionProcessor()).to("activemq-1:queue:bar?replyTo=queue:bar.reply");
+
from("activemq:queue:foo?replyTo=queue:foo.reply").policy(required).process(cp).to("activemq-1:queue:bar?replyTo=queue:bar.reply");
from("activemq-1:queue:bar").process(new Processor() {
public void process(Exchange e) {
String request = e.getIn().getBody(String.class);
@@ -66,8 +65,11 @@
}
});
- Object reply = template.requestBody("activemq:queue:foo", "blah");
- assertTrue("Received unexpeced reply", reply.equals("Re: blah"));
+ for (int i = 0; i < 10; ++i) {
+ Object reply = template.requestBody("activemq:queue:foo", "blah" +
i);
+ assertTrue("Received unexpeced reply", reply.equals("Re: blah" +
i));
+ assertTrue(cp.getErrorMessage(), cp.getErrorMessage() == null);
+ }
}
/*
* This is a working test but is commented out because there is bug in that
ConditionalExceptionProcessor
Modified: activemq/camel/trunk/components/camel-spring/pom.xml
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring/pom.xml?rev=673008&r1=673007&r2=673008&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-spring/pom.xml (original)
+++ activemq/camel/trunk/components/camel-spring/pom.xml Mon Jun 30 23:32:47
2008
@@ -104,6 +104,17 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-jdbc</artifactId>
+ <version>${spring-version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>hsqldb</groupId>
+ <artifactId>hsqldb</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<scope>test</scope>
Modified:
activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionInterceptor.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionInterceptor.java?rev=673008&r1=673007&r2=673008&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionInterceptor.java
(original)
+++
activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionInterceptor.java
Mon Jun 30 23:32:47 2008
@@ -17,46 +17,100 @@
package org.apache.camel.spring.spi;
import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeProperty;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.processor.DelegateProcessor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.support.DefaultTransactionStatus;
import
org.springframework.transaction.support.TransactionCallbackWithoutResult;
+import
org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate;
/**
+ * The <a
href="http://activemq.apache.org/camel/transactional-client.html">Transactional
Client</a>
+ * EIP pattern.
+ *
* @version $Revision$
*/
public class TransactionInterceptor extends DelegateProcessor {
private static final transient Log LOG =
LogFactory.getLog(TransactionInterceptor.class);
private final TransactionTemplate transactionTemplate;
+ public static final ExchangeProperty<Boolean> TRANSACTED =
+ new ExchangeProperty<Boolean>("transacted",
"org.apache.camel.transacted", Boolean.class);
+
public TransactionInterceptor(TransactionTemplate transactionTemplate) {
this.transactionTemplate = transactionTemplate;
}
public void process(final Exchange exchange) {
- LOG.info("transaction begin");
+ LOG.debug("Transaction begin");
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
protected void doInTransactionWithoutResult(TransactionStatus
status) {
+ // wrapper exception to throw if the exchange failed
+ // IMPORTANT: Must be a runtime exception to let Spring regard
it as to do "rollback"
+ RuntimeCamelException rce = null;
+
+ boolean activeTx = false;
try {
+ // find out if there is an actual transaction alive, and
thus we are in transacted mode
+ activeTx =
TransactionSynchronizationManager.isActualTransactionActive();
+ if (!activeTx) {
+ activeTx = status.isNewTransaction() &&
!status.isCompleted();
+ if (!activeTx) {
+ if
(DefaultTransactionStatus.class.isAssignableFrom(status.getClass())) {
+ DefaultTransactionStatus defStatus =
DefaultTransactionStatus.class
+ .cast(status);
+ activeTx = defStatus.hasTransaction() &&
!status.isCompleted();
+ }
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Is actual transaction active: " + activeTx);
+ }
+
+ // okay mark the exchange as transacted, then the
DeadLetterChannel or others know
+ // its an transacted exchange
+ if (activeTx) {
+ TRANSACTED.set(exchange, Boolean.TRUE);
+ }
+
+
+ // process the exchange
processNext(exchange);
+
+ // wrap if the exchange failed with an exception
+ if (exchange.getException() != null) {
+ rce = new
RuntimeCamelException(exchange.getException());
+ }
} catch (Exception e) {
- throw new RuntimeCamelException(e);
+ // wrap if the exchange threw an exception
+ rce = new RuntimeCamelException(e);
+ }
+
+ // rehrow exception if the exchange failed
+ if (rce != null) {
+ if (activeTx) {
+ status.setRollbackOnly();
+ LOG.debug("Transaction rollback");
+ }
+ throw rce;
}
}
});
-
- LOG.info("transaction commit");
+
+ LOG.debug("Transaction commit");
}
@Override
public String toString() {
- return "TransactionInterceptor:" +
propagationBehaviorToString(transactionTemplate.getPropagationBehavior()) + "["
+ getProcessor() + "]";
+ return "TransactionInterceptor:"
+ +
propagationBehaviorToString(transactionTemplate.getPropagationBehavior())
+ + "[" + getProcessor() + "]";
}
private String propagationBehaviorToString(int propagationBehavior) {
@@ -84,8 +138,9 @@
rc = "PROPAGATION_SUPPORTS";
break;
default:
- rc = "UNKOWN";
+ rc = "UNKOWN";
}
return rc;
}
+
}
Added:
activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/BookService.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/BookService.java?rev=673008&view=auto
==============================================================================
---
activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/BookService.java
(added)
+++
activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/BookService.java
Mon Jun 30 23:32:47 2008
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.interceptor;
+
+import javax.sql.DataSource;
+
+import org.springframework.jdbc.core.simple.SimpleJdbcTemplate;
+
+/**
+ * Used for unit testing
+ */
+// START SNIPPET: e1
+public class BookService {
+
+ private SimpleJdbcTemplate jdbc;
+
+ public BookService() {
+ }
+
+ public void setDataSource(DataSource ds) {
+ jdbc = new SimpleJdbcTemplate(ds);
+ }
+
+ public void orderBook(String title) throws Exception {
+ if (title.startsWith("Donkey")) {
+ throw new IllegalArgumentException("We don't have Donkeys, only
Camels");
+ }
+
+ // create new local datasource to store in DB
+ jdbc.update("insert into books (title) values (?)", title);
+ }
+}
+// END SNIPPET: e1
Added:
activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceTest.java?rev=673008&view=auto
==============================================================================
---
activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceTest.java
(added)
+++
activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceTest.java
Mon Jun 30 23:32:47 2008
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.interceptor;
+
+import javax.sql.DataSource;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spring.SpringTestSupport;
+import org.apache.camel.spring.spi.SpringTransactionPolicy;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.transaction.support.TransactionTemplate;
+
+/**
+ * Unit test to demonstrate the transactional client pattern.
+ */
+public class TransactionalClientDataSourceTest extends SpringTestSupport {
+
+ private JdbcTemplate jdbc;
+
+ protected ClassPathXmlApplicationContext createApplicationContext() {
+ return new ClassPathXmlApplicationContext(
+
"/org/apache/camel/spring/interceptor/transactionalClientDataSource.xml");
+ }
+
+ protected int getExpectedRouteCount() {
+ return 0;
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ this.disableJMX();
+ super.setUp();
+
+ // START SNIPPET: e5
+ // create database and insert dummy data
+ final DataSource ds = getMandatoryBean(DataSource.class, "dataSource");
+ jdbc = new JdbcTemplate(ds);
+ jdbc.execute("create table books (title varchar(50))");
+ jdbc.update("insert into books (title) values (?)", new Object[]
{"Camel in Action"});
+ // END SNIPPET: e5
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ jdbc.execute("drop table books");
+ this.enableJMX();
+ }
+
+ // START SNIPPET: e3
+ public void testTransactionSuccess() throws Exception {
+ template.sendBody("direct:okay", "Hello World");
+
+ int count = jdbc.queryForInt("select count(*) from books");
+ assertEquals("Number of books", 3, count);
+ }
+ // END SNIPPET: e3
+
+ // START SNIPPET: e4
+ public void testTransactionRollback() throws Exception {
+ template.sendBody("direct:fail", "Hello World");
+
+ int count = jdbc.queryForInt("select count(*) from books");
+ assertEquals("Number of books", 1, count);
+ }
+ // END SNIPPET: e4
+
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ // START SNIPPET: e1
+ // setup the transaction policy
+ TransactionTemplate tt = context.getRegistry()
+ .lookup("PROPAGATION_REQUIRED", TransactionTemplate.class);
+ SpringTransactionPolicy required = new
SpringTransactionPolicy(tt);
+
+ // set the required policy for this route
+ from("direct:okay").policy(required).
+ setBody(constant("Tiger in
Action")).beanRef("bookService").
+ setBody(constant("Elephant in
Action")).beanRef("bookService");
+ // END SNIPPET: e1
+
+ // START SNIPPET: e2
+ // set the required policy for this route
+ from("direct:fail").policy(required).
+ setBody(constant("Tiger in
Action")).beanRef("bookService").
+ setBody(constant("Donkey in
Action")).beanRef("bookService");
+ // END SNIPPET: e2
+ }
+ };
+ }
+
+}
Added:
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/interceptor/transactionalClientDataSource.xml
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/interceptor/transactionalClientDataSource.xml?rev=673008&view=auto
==============================================================================
---
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/interceptor/transactionalClientDataSource.xml
(added)
+++
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/interceptor/transactionalClientDataSource.xml
Mon Jun 30 23:32:47 2008
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
+ http://activemq.apache.org/camel/schema/spring
http://activemq.apache.org/camel/schema/spring/camel-spring.xsd">
+
+ <!-- datasource to the database -->
+ <bean id="dataSource"
class="org.springframework.jdbc.datasource.DriverManagerDataSource">
+ <property name="driverClassName" value="org.hsqldb.jdbcDriver"/>
+ <property name="url" value="jdbc:hsqldb:mem:camel"/>
+ <property name="username" value="sa"/>
+ <property name="password" value=""/>
+ </bean>
+
+ <!-- spring transaction manager -->
+ <bean id="txManager"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
+ <property name="dataSource" ref="dataSource"/>
+ </bean>
+
+ <!-- policy for required transaction used in our Camel routes -->
+ <bean id="PROPAGATION_REQUIRED"
class="org.springframework.transaction.support.TransactionTemplate">
+ <property name="transactionManager" ref="txManager"/>
+ </bean>
+
+ <!-- bean for book business logic -->
+ <bean id="bookService"
class="org.apache.camel.spring.interceptor.BookService">
+ <property name="dataSource" ref="dataSource"/>
+ </bean>
+
+</beans>