Author: cmueller
Date: Tue Apr 26 19:58:17 2011
New Revision: 1096880
URL: http://svn.apache.org/viewvc?rev=1096880&view=rev
Log:
CAMEL-3803: Component camel-jdbc does not support Transactions
Thanks Heath Kesler for the patch
Modified:
camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcEndpoint.java
camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java
camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcOptionsTest.java
camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcProducerConcurrenctTest.java
camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcStatementParametersTest.java
Modified:
camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcEndpoint.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcEndpoint.java?rev=1096880&r1=1096879&r2=1096880&view=diff
==============================================================================
---
camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcEndpoint.java
(original)
+++
camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcEndpoint.java
Tue Apr 26 19:58:17 2011
@@ -30,6 +30,7 @@ import org.apache.camel.impl.DefaultEndp
*/
public class JdbcEndpoint extends DefaultEndpoint {
private int readSize;
+ private boolean transacted;
private DataSource dataSource;
private Map<String, Object> parameters;
private boolean useJDBC4ColumnNameAndLabelSemantics = true;
@@ -51,7 +52,7 @@ public class JdbcEndpoint extends Defaul
}
public Producer createProducer() throws Exception {
- return new JdbcProducer(this, dataSource, readSize, parameters);
+ return new JdbcProducer(this, dataSource, readSize, parameters);
}
public int getReadSize() {
@@ -62,6 +63,14 @@ public class JdbcEndpoint extends Defaul
this.readSize = readSize;
}
+ public boolean isTransacted() {
+ return transacted;
+ }
+
+ public void setTransacted(boolean transacted) {
+ this.transacted = transacted;
+ }
+
public DataSource getDataSource() {
return dataSource;
}
Modified:
camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java?rev=1096880&r1=1096879&r2=1096880&view=diff
==============================================================================
---
camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java
(original)
+++
camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java
Tue Apr 26 19:58:17 2011
@@ -62,10 +62,13 @@ public class JdbcProducer extends Defaul
Connection conn = null;
Statement stmt = null;
ResultSet rs = null;
+
try {
conn = dataSource.getConnection();
+ conn.setAutoCommit(false);
+
stmt = conn.createStatement();
-
+
if (parameters != null && !parameters.isEmpty()) {
IntrospectionSupport.setProperties(stmt, parameters);
}
@@ -81,26 +84,54 @@ public class JdbcProducer extends Defaul
int updateCount = stmt.getUpdateCount();
exchange.getOut().setHeader(JdbcConstants.JDBC_UPDATE_COUNT,
updateCount);
}
- } finally {
- try {
- if (rs != null) {
- rs.close();
- }
- if (stmt != null) {
- stmt.close();
- }
- if (conn != null) {
- conn.close();
- }
- } catch (SQLException e) {
- LOG.warn("Error closing JDBC resource: " + e, e);
+ conn.commit();
+ } catch (Exception e){
+ try{
+ conn.rollback();
+ } catch (SQLException sqle){
+ LOG.warn("Error on jdbc component rollback: " + sqle, sqle);
}
+ throw e;
+ } finally {
+ closeQuietly(rs);
+ closeQuietly(stmt);
+ closeQuietly(conn);
}
// populate headers
exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders());
}
+ private void closeQuietly(ResultSet rs) {
+ if (rs != null) {
+ try{
+ rs.close();
+ } catch (SQLException sqle){
+ LOG.warn("Error by closing result set: " + sqle, sqle);
+ }
+ }
+ }
+
+ private void closeQuietly(Statement stmt) {
+ if (stmt != null) {
+ try{
+ stmt.close();
+ } catch (SQLException sqle){
+ LOG.warn("Error by closing statement: " + sqle, sqle);
+ }
+ }
+ }
+
+ private void closeQuietly(Connection con) {
+ if (con != null) {
+ try{
+ con.close();
+ } catch (SQLException sqle){
+ LOG.warn("Error by closing connection: " + sqle, sqle);
+ }
+ }
+ }
+
/**
* Sets the result from the ResultSet to the Exchange as its OUT body.
*/
Modified:
camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcOptionsTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcOptionsTest.java?rev=1096880&r1=1096879&r2=1096880&view=diff
==============================================================================
---
camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcOptionsTest.java
(original)
+++
camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcOptionsTest.java
Tue Apr 26 19:58:17 2011
@@ -21,6 +21,7 @@ import java.util.List;
import javax.sql.DataSource;
+import org.apache.camel.CamelExecutionException;
import org.apache.camel.ResolveEndpointFailedException;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
@@ -39,6 +40,7 @@ public class JdbcOptionsTest extends Cam
private String password = "";
private DataSource ds;
+ @SuppressWarnings("rawtypes")
@Test
public void testReadSize() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
@@ -52,6 +54,58 @@ public class JdbcOptionsTest extends Cam
assertEquals(1, list.size());
}
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testInsertCommitO() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:resultTx");
+ mock.expectedMessageCount(1);
+ // insert 2 recs into table
+ template.sendBody("direct:startTx", "insert into customer values
('cust3', 'johnsmith');insert into customer values ('cust4', 'hkesler') ");
+
+ mock.assertIsSatisfied();
+
+ String body = mock.getExchanges().get(0).getIn().getBody(String.class);
+ assertNull(body);
+
+ // now test to see that they were inserted and committed properly
+ MockEndpoint mockTest = getMockEndpoint("mock:retrieve");
+ mockTest.expectedMessageCount(1);
+
+ template.sendBody("direct:retrieve", "select * from customer");
+
+ mockTest.assertIsSatisfied();
+
+ List list =
mockTest.getExchanges().get(0).getIn().getBody(ArrayList.class);
+ // both records were committed
+ assertEquals(4, list.size());
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testInsertRollback() throws Exception {
+ // insert 2 records
+ try{
+ template.sendBody("direct:startTx", "insert into customer values
('cust3', 'johnsmith');insert into customer values ('cust3', 'hkesler')");
+ fail("Should have thrown a CamelExecutionException");
+ } catch (CamelExecutionException e) {
+ if (!e.getCause().getMessage().contains("Violation of unique
constraint")) {
+ fail("Test did not throw the expected Constraint Violation
Exception");
+ }
+ }
+
+ // check to see that they failed by getting a rec count from table
+ MockEndpoint mockTest = getMockEndpoint("mock:retrieve");
+ mockTest.expectedMessageCount(1);
+
+ template.sendBody("direct:retrieve", "select * from customer");
+
+ mockTest.assertIsSatisfied();
+
+ List list =
mockTest.getExchanges().get(0).getIn().getBody(ArrayList.class);
+ // all recs failed to insert
+ assertEquals(2, list.size());
+ }
+
@Test
public void testNoDataSourceInRegistry() throws Exception {
try {
@@ -73,6 +127,8 @@ public class JdbcOptionsTest extends Cam
return new RouteBuilder() {
public void configure() throws Exception {
from("direct:start").to("jdbc:testdb?readSize=1").to("mock:result");
+ from("direct:retrieve").to("jdbc:testdb").to("mock:retrieve");
+
from("direct:startTx").to("jdbc:testdb?transacted=true").to("mock:resultTx");
}
};
}
@@ -84,7 +140,7 @@ public class JdbcOptionsTest extends Cam
ds = dataSource;
JdbcTemplate jdbc = new JdbcTemplate(ds);
- jdbc.execute("create table customer (id varchar(15), name
varchar(10))");
+ jdbc.execute("create table customer (id varchar(15) PRIMARY KEY, name
varchar(10))");
jdbc.execute("insert into customer values('cust1','jstrachan')");
jdbc.execute("insert into customer values('cust2','nsandhu')");
super.setUp();
Modified:
camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcProducerConcurrenctTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcProducerConcurrenctTest.java?rev=1096880&r1=1096879&r2=1096880&view=diff
==============================================================================
---
camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcProducerConcurrenctTest.java
(original)
+++
camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcProducerConcurrenctTest.java
Tue Apr 26 19:58:17 2011
@@ -55,14 +55,15 @@ public class JdbcProducerConcurrenctTest
doSendMessages(10, 5);
}
+ @SuppressWarnings("rawtypes")
private void doSendMessages(int files, int poolSize) throws Exception {
getMockEndpoint("mock:result").expectedMessageCount(files);
ExecutorService executor = Executors.newFixedThreadPool(poolSize);
- Map<Integer, Future> responses = new ConcurrentHashMap<Integer,
Future>();
+ Map<Integer, Future<Object>> responses = new
ConcurrentHashMap<Integer, Future<Object>>();
for (int i = 0; i < files; i++) {
final int index = i;
- Future out = executor.submit(new Callable<Object>() {
+ Future<Object> out = executor.submit(new Callable<Object>() {
public Object call() throws Exception {
int id = index % 2;
return template.requestBody("direct:start", "select * from
customer where id = " + id);
Modified:
camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcStatementParametersTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcStatementParametersTest.java?rev=1096880&r1=1096879&r2=1096880&view=diff
==============================================================================
---
camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcStatementParametersTest.java
(original)
+++
camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcStatementParametersTest.java
Tue Apr 26 19:58:17 2011
@@ -38,6 +38,7 @@ public class JdbcStatementParametersTest
private String user = "sa";
private String password = "";
+ @SuppressWarnings("rawtypes")
@Test
public void testMax2Rows() throws Exception {
List rows = template.requestBody("direct:hello", "select * from
customer order by id", List.class);
@@ -46,6 +47,7 @@ public class JdbcStatementParametersTest
assertEquals(2, context.getEndpoints().size());
}
+ @SuppressWarnings("rawtypes")
@Test
public void testMax5Rows() throws Exception {
List rows =
template.requestBody("jdbc:testdb?statement.maxRows=5&statement.fetchSize=50",
"select * from customer order by id", List.class);
@@ -54,6 +56,7 @@ public class JdbcStatementParametersTest
assertEquals(3, context.getEndpoints().size());
}
+ @SuppressWarnings("rawtypes")
@Test
public void testNoParameters() throws Exception {
List rows = template.requestBody("jdbc:testdb", "select * from
customer order by id", List.class);