CAMEL-8715: camel-sql - Should close ResultSet
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/cc9160ac Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/cc9160ac Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/cc9160ac Branch: refs/heads/master Commit: cc9160ac7e416decbddf9f545952f7a7c3ef5d80 Parents: cd95b8b Author: Claus Ibsen <[email protected]> Authored: Wed Apr 29 08:03:16 2015 +0200 Committer: Claus Ibsen <[email protected]> Committed: Wed Apr 29 10:26:31 2015 +0200 ---------------------------------------------------------------------- .../apache/camel/component/sql/SqlConsumer.java | 5 +- .../apache/camel/component/sql/SqlEndpoint.java | 5 +- .../apache/camel/component/sql/SqlProducer.java | 145 ++++++++++--------- 3 files changed, 82 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/cc9160ac/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java index a9e4b49..cdddd81 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java +++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java @@ -21,7 +21,6 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.Queue; import org.apache.camel.Exchange; @@ -36,6 +35,8 @@ import org.springframework.dao.DataAccessException; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.PreparedStatementCallback; +import static org.springframework.jdbc.support.JdbcUtils.closeResultSet; + public class SqlConsumer extends ScheduledBatchPollingConsumer { private final String query; @@ -110,7 +111,7 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer { throw new IllegalArgumentException("Invalid outputType=" + outputType); } } finally { - rs.close(); + closeResultSet(rs); } // process all the exchanges in this batch http://git-wip-us.apache.org/repos/asf/camel/blob/cc9160ac/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java index 9ddbda9..507b071 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java +++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java @@ -346,10 +346,11 @@ public class SqlEndpoint extends DefaultPollingEndpoint { return "sql:" + UnsafeUriCharactersEncoder.encode(query); } + @SuppressWarnings("unchecked") protected List<?> queryForList(ResultSet rs, boolean allowMapToClass) throws SQLException { if (allowMapToClass && outputClass != null) { - Class<?> outputClzz = getCamelContext().getClassResolver().resolveClass(outputClass); - RowMapper rowMapper = new BeanPropertyRowMapper(outputClzz); + Class<?> outputClazz = getCamelContext().getClassResolver().resolveClass(outputClass); + RowMapper rowMapper = new BeanPropertyRowMapper(outputClazz); RowMapperResultSetExtractor<?> mapper = new RowMapperResultSetExtractor(rowMapper); List<?> data = mapper.extractData(rs); return data; http://git-wip-us.apache.org/repos/asf/camel/blob/cc9160ac/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java index a4554a8..c5eda07 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java +++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java @@ -32,6 +32,8 @@ import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.PreparedStatementCallback; import org.springframework.jdbc.core.PreparedStatementCreator; +import static org.springframework.jdbc.support.JdbcUtils.closeResultSet; + public class SqlProducer extends DefaultProducer { private String query; private JdbcTemplate jdbcTemplate; @@ -89,59 +91,48 @@ public class SqlProducer extends DefaultProducer { jdbcTemplate.execute(statementCreator, new PreparedStatementCallback<Map<?, ?>>() { public Map<?, ?> doInPreparedStatement(PreparedStatement ps) throws SQLException { - int expected = parametersCount > 0 ? parametersCount : ps.getParameterMetaData().getParameterCount(); - - // only populate if really needed - if (alwaysPopulateStatement || expected > 0) { - // transfer incoming message body data to prepared statement parameters, if necessary - if (batch) { - Iterator<?> iterator = exchange.getIn().getBody(Iterator.class); - while (iterator != null && iterator.hasNext()) { - Object value = iterator.next(); - Iterator<?> i = sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected, exchange, value); + ResultSet rs = null; + try { + int expected = parametersCount > 0 ? parametersCount : ps.getParameterMetaData().getParameterCount(); + + // only populate if really needed + if (alwaysPopulateStatement || expected > 0) { + // transfer incoming message body data to prepared statement parameters, if necessary + if (batch) { + Iterator<?> iterator = exchange.getIn().getBody(Iterator.class); + while (iterator != null && iterator.hasNext()) { + Object value = iterator.next(); + Iterator<?> i = sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected, exchange, value); + sqlPrepareStatementStrategy.populateStatement(ps, i, expected); + ps.addBatch(); + } + } else { + Iterator<?> i = sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected, exchange, exchange.getIn().getBody()); sqlPrepareStatementStrategy.populateStatement(ps, i, expected); - ps.addBatch(); } - } else { - Iterator<?> i = sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected, exchange, exchange.getIn().getBody()); - sqlPrepareStatementStrategy.populateStatement(ps, i, expected); } - } - boolean isResultSet = false; + boolean isResultSet = false; - // execute the prepared statement and populate the outgoing message - if (batch) { - int[] updateCounts = ps.executeBatch(); - int total = 0; - for (int count : updateCounts) { - total += count; - } - exchange.getIn().setHeader(SqlConstants.SQL_UPDATE_COUNT, total); - } else { - isResultSet = ps.execute(); - if (isResultSet) { - // preserve headers first, so we can override the SQL_ROW_COUNT header - exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders()); - - ResultSet rs = ps.getResultSet(); - SqlOutputType outputType = getEndpoint().getOutputType(); - log.trace("Got result list from query: {}, outputType={}", rs, outputType); - if (outputType == SqlOutputType.SelectList) { - List<?> data = getEndpoint().queryForList(rs, true); - // for noop=true we still want to enrich with the row count header - if (getEndpoint().isNoop()) { - exchange.getOut().setBody(exchange.getIn().getBody()); - } else if (getEndpoint().getOutputHeader() != null) { - exchange.getOut().setBody(exchange.getIn().getBody()); - exchange.getOut().setHeader(getEndpoint().getOutputHeader(), data); - } else { - exchange.getOut().setBody(data); - } - exchange.getOut().setHeader(SqlConstants.SQL_ROW_COUNT, data.size()); - } else if (outputType == SqlOutputType.SelectOne) { - Object data = getEndpoint().queryForObject(rs); - if (data != null) { + // execute the prepared statement and populate the outgoing message + if (batch) { + int[] updateCounts = ps.executeBatch(); + int total = 0; + for (int count : updateCounts) { + total += count; + } + exchange.getIn().setHeader(SqlConstants.SQL_UPDATE_COUNT, total); + } else { + isResultSet = ps.execute(); + if (isResultSet) { + // preserve headers first, so we can override the SQL_ROW_COUNT header + exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders()); + + rs = ps.getResultSet(); + SqlOutputType outputType = getEndpoint().getOutputType(); + log.trace("Got result list from query: {}, outputType={}", rs, outputType); + if (outputType == SqlOutputType.SelectList) { + List<?> data = getEndpoint().queryForList(rs, true); // for noop=true we still want to enrich with the row count header if (getEndpoint().isNoop()) { exchange.getOut().setBody(exchange.getIn().getBody()); @@ -151,35 +142,51 @@ public class SqlProducer extends DefaultProducer { } else { exchange.getOut().setBody(data); } - exchange.getOut().setHeader(SqlConstants.SQL_ROW_COUNT, 1); + exchange.getOut().setHeader(SqlConstants.SQL_ROW_COUNT, data.size()); + } else if (outputType == SqlOutputType.SelectOne) { + Object data = getEndpoint().queryForObject(rs); + if (data != null) { + // for noop=true we still want to enrich with the row count header + if (getEndpoint().isNoop()) { + exchange.getOut().setBody(exchange.getIn().getBody()); + } else if (getEndpoint().getOutputHeader() != null) { + exchange.getOut().setBody(exchange.getIn().getBody()); + exchange.getOut().setHeader(getEndpoint().getOutputHeader(), data); + } else { + exchange.getOut().setBody(data); + } + exchange.getOut().setHeader(SqlConstants.SQL_ROW_COUNT, 1); + } + } else { + throw new IllegalArgumentException("Invalid outputType=" + outputType); } } else { - throw new IllegalArgumentException("Invalid outputType=" + outputType); + exchange.getIn().setHeader(SqlConstants.SQL_UPDATE_COUNT, ps.getUpdateCount()); } - } else { - exchange.getIn().setHeader(SqlConstants.SQL_UPDATE_COUNT, ps.getUpdateCount()); } - } - if (shouldRetrieveGeneratedKeys) { - // if no OUT message yet then create one and propagate headers - if (!exchange.hasOut()) { - exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders()); - } + if (shouldRetrieveGeneratedKeys) { + // if no OUT message yet then create one and propagate headers + if (!exchange.hasOut()) { + exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders()); + } - if (isResultSet) { - // we won't return generated keys for SELECT statements - exchange.getOut().setHeader(SqlConstants.SQL_GENERATED_KEYS_DATA, Collections.EMPTY_LIST); - exchange.getOut().setHeader(SqlConstants.SQL_GENERATED_KEYS_ROW_COUNT, 0); - } else { - List<?> generatedKeys = getEndpoint().queryForList(ps.getGeneratedKeys(), false); - exchange.getOut().setHeader(SqlConstants.SQL_GENERATED_KEYS_DATA, generatedKeys); - exchange.getOut().setHeader(SqlConstants.SQL_GENERATED_KEYS_ROW_COUNT, generatedKeys.size()); + if (isResultSet) { + // we won't return generated keys for SELECT statements + exchange.getOut().setHeader(SqlConstants.SQL_GENERATED_KEYS_DATA, Collections.EMPTY_LIST); + exchange.getOut().setHeader(SqlConstants.SQL_GENERATED_KEYS_ROW_COUNT, 0); + } else { + List<?> generatedKeys = getEndpoint().queryForList(ps.getGeneratedKeys(), false); + exchange.getOut().setHeader(SqlConstants.SQL_GENERATED_KEYS_DATA, generatedKeys); + exchange.getOut().setHeader(SqlConstants.SQL_GENERATED_KEYS_ROW_COUNT, generatedKeys.size()); + } } - } - // data is set on exchange so return null - return null; + // data is set on exchange so return null + return null; + } finally { + closeResultSet(rs); + } } }); }
