Author: ningjiang Date: Tue Jul 26 15:05:42 2011 New Revision: 1151126 URL: http://svn.apache.org/viewvc?rev=1151126&view=rev Log: CAMEL-4272 camel-jdbc should provide a option not set the autoCommit flag
Modified: camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcEndpoint.java camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java Modified: camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcEndpoint.java?rev=1151126&r1=1151125&r2=1151126&view=diff ============================================================================== --- camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcEndpoint.java (original) +++ camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcEndpoint.java Tue Jul 26 15:05:42 2011 @@ -31,6 +31,7 @@ import org.apache.camel.impl.DefaultEndp public class JdbcEndpoint extends DefaultEndpoint { private int readSize; private boolean transacted; + private boolean resetAutoCommit = true; private DataSource dataSource; private Map<String, Object> parameters; private boolean useJDBC4ColumnNameAndLabelSemantics = true; @@ -71,6 +72,14 @@ public class JdbcEndpoint extends Defaul this.transacted = transacted; } + public boolean isResetAutoCommit() { + return resetAutoCommit; + } + + public void setResetAutoCommit(boolean resetAutoCommit) { + this.resetAutoCommit = resetAutoCommit; + } + public DataSource getDataSource() { return dataSource; } Modified: camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java?rev=1151126&r1=1151125&r2=1151126&view=diff ============================================================================== --- camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java (original) +++ camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java Tue Jul 26 15:05:42 2011 @@ -58,23 +58,34 @@ public class JdbcProducer extends Defaul * Execute sql of exchange and set results on output */ public void process(Exchange exchange) throws Exception { + if (getEndpoint().isResetAutoCommit()) { + processingSqlBySettingAutoCommit(exchange); + } else { + processingSqlWithoutSettingAutoCommit(exchange); + } + // populate headers + exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders()); + } + + private void processingSqlBySettingAutoCommit(Exchange exchange) throws Exception { String sql = exchange.getIn().getBody(String.class); Connection conn = null; Statement stmt = null; ResultSet rs = null; Boolean autoCommit = null; - try { conn = dataSource.getConnection(); autoCommit = conn.getAutoCommit(); - conn.setAutoCommit(false); - + if (autoCommit) { + conn.setAutoCommit(false); + } + stmt = conn.createStatement(); - + if (parameters != null && !parameters.isEmpty()) { IntrospectionSupport.setProperties(stmt, parameters); } - + LOG.debug("Executing JDBC statement: {}", sql); if (stmt.execute(sql)) { @@ -98,9 +109,36 @@ public class JdbcProducer extends Defaul resetAutoCommit(conn, autoCommit); closeQuietly(conn); } + } - // populate headers - exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders()); + private void processingSqlWithoutSettingAutoCommit(Exchange exchange) throws Exception { + String sql = exchange.getIn().getBody(String.class); + Connection conn = null; + Statement stmt = null; + ResultSet rs = null; + try { + conn = dataSource.getConnection(); + + stmt = conn.createStatement(); + + if (parameters != null && !parameters.isEmpty()) { + IntrospectionSupport.setProperties(stmt, parameters); + } + + LOG.debug("Executing JDBC statement: {}", sql); + + if (stmt.execute(sql)) { + rs = stmt.getResultSet(); + setResultSet(exchange, rs); + } else { + int updateCount = stmt.getUpdateCount(); + exchange.getOut().setHeader(JdbcConstants.JDBC_UPDATE_COUNT, updateCount); + } + } finally { + closeQuietly(rs); + closeQuietly(stmt); + closeQuietly(conn); + } } private void closeQuietly(ResultSet rs) {