Repository: camel Updated Branches: refs/heads/master 9e0a204e2 -> 1fb191c91
CAMEL-7447 Allow to stream the result of a database query Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/16c1d36d Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/16c1d36d Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/16c1d36d Branch: refs/heads/master Commit: 16c1d36d848c3ae02507babf8aae7a8dac9ebd83 Parents: c160402 Author: Antoine DESSAIGNE <antoine.dessai...@gmail.com> Authored: Thu May 22 11:53:54 2014 +0200 Committer: Antoine DESSAIGNE <antoine.dessai...@gmail.com> Committed: Thu May 22 11:53:54 2014 +0200 ---------------------------------------------------------------------- .../camel/component/jdbc/JdbcOutputType.java | 2 +- .../camel/component/jdbc/JdbcProducer.java | 244 +++++++++---------- .../camel/component/jdbc/ResultSetIterator.java | 188 ++++++++++++++ .../JdbcProducerOutputTypeStreamListTest.java | 68 ++++++ 4 files changed, 376 insertions(+), 126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/16c1d36d/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcOutputType.java ---------------------------------------------------------------------- diff --git a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcOutputType.java b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcOutputType.java index 183735f..30ef9df 100644 --- a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcOutputType.java +++ b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcOutputType.java @@ -17,5 +17,5 @@ package org.apache.camel.component.jdbc; public enum JdbcOutputType { - SelectOne, SelectList + SelectOne, SelectList, StreamList } http://git-wip-us.apache.org/repos/asf/camel/blob/16c1d36d/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java index 15850e8..9b8fad0 100644 --- a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java +++ b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java @@ -19,20 +19,14 @@ package org.apache.camel.component.jdbc; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; -import java.sql.ResultSetMetaData; import java.sql.SQLDataException; import java.sql.SQLException; import java.sql.Statement; -import java.sql.Types; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import javax.sql.DataSource; - +import java.util.*; +import javax.sql.*; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultProducer; +import org.apache.camel.spi.Synchronization; import org.apache.camel.util.IntrospectionSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,6 +69,8 @@ public class JdbcProducer extends DefaultProducer { String sql = exchange.getIn().getBody(String.class); Connection conn = null; Boolean autoCommit = null; + boolean shouldCloseResources = true; + try { conn = dataSource.getConnection(); autoCommit = conn.getAutoCommit(); @@ -82,7 +78,7 @@ public class JdbcProducer extends DefaultProducer { conn.setAutoCommit(false); } - createAndExecuteSqlStatement(exchange, sql, conn); + shouldCloseResources = createAndExecuteSqlStatement(exchange, sql, conn); conn.commit(); } catch (Exception e) { @@ -95,39 +91,45 @@ public class JdbcProducer extends DefaultProducer { } throw e; } finally { - resetAutoCommit(conn, autoCommit); - closeQuietly(conn); + if (shouldCloseResources) { + resetAutoCommit(conn, autoCommit); + closeQuietly(conn); + } } } private void processingSqlWithoutSettingAutoCommit(Exchange exchange) throws Exception { String sql = exchange.getIn().getBody(String.class); Connection conn = null; + boolean shouldCloseResources = true; + try { conn = dataSource.getConnection(); - createAndExecuteSqlStatement(exchange, sql, conn); + shouldCloseResources = createAndExecuteSqlStatement(exchange, sql, conn); } finally { - closeQuietly(conn); + if (shouldCloseResources) { + closeQuietly(conn); + } } } - private void createAndExecuteSqlStatement(Exchange exchange, String sql, Connection conn) throws Exception { + private boolean createAndExecuteSqlStatement(Exchange exchange, String sql, Connection conn) throws Exception { if (getEndpoint().isUseHeadersAsParameters()) { - doCreateAndExecuteSqlStatementWithHeaders(exchange, sql, conn); + return doCreateAndExecuteSqlStatementWithHeaders(exchange, sql, conn); } else { - doCreateAndExecuteSqlStatement(exchange, sql, conn); + return doCreateAndExecuteSqlStatement(exchange, sql, conn); } } - private void doCreateAndExecuteSqlStatementWithHeaders(Exchange exchange, String sql, Connection conn) throws Exception { + private boolean doCreateAndExecuteSqlStatementWithHeaders(Exchange exchange, String sql, Connection conn) throws Exception { PreparedStatement ps = null; ResultSet rs = null; + boolean shouldCloseResources = true; try { final String preparedQuery = getEndpoint().getPrepareStatementStrategy().prepareQuery(sql, getEndpoint().isAllowNamedParameters()); - Boolean shouldRetrieveGeneratedKeys = - exchange.getIn().getHeader(JdbcConstants.JDBC_RETRIEVE_GENERATED_KEYS, false, Boolean.class); + Boolean shouldRetrieveGeneratedKeys = exchange.getIn().getHeader(JdbcConstants.JDBC_RETRIEVE_GENERATED_KEYS, false, Boolean.class); if (shouldRetrieveGeneratedKeys) { Object expectedGeneratedColumns = exchange.getIn().getHeader(JdbcConstants.JDBC_GENERATED_COLUMNS); @@ -139,8 +141,7 @@ public class JdbcProducer extends DefaultProducer { ps = conn.prepareStatement(preparedQuery, (int[]) expectedGeneratedColumns); } else { throw new IllegalArgumentException( - "Header specifying expected returning columns isn't an instance of String[] or int[] but " - + expectedGeneratedColumns.getClass()); + "Header specifying expected returning columns isn't an instance of String[] or int[] but " + expectedGeneratedColumns.getClass()); } } else { ps = conn.prepareStatement(preparedQuery); @@ -149,7 +150,8 @@ public class JdbcProducer extends DefaultProducer { int expectedCount = ps.getParameterMetaData().getParameterCount(); if (expectedCount > 0) { - Iterator<?> it = getEndpoint().getPrepareStatementStrategy().createPopulateIterator(sql, preparedQuery, expectedCount, exchange, exchange.getIn().getBody()); + Iterator<?> it = getEndpoint().getPrepareStatementStrategy() + .createPopulateIterator(sql, preparedQuery, expectedCount, exchange, exchange.getIn().getBody()); getEndpoint().getPrepareStatementStrategy().populateStatement(ps, it, expectedCount); } @@ -159,6 +161,7 @@ public class JdbcProducer extends DefaultProducer { if (stmtExecutionResult) { rs = ps.getResultSet(); setResultSet(exchange, rs); + shouldCloseResources = false; } else { int updateCount = ps.getUpdateCount(); exchange.getOut().setHeader(JdbcConstants.JDBC_UPDATE_COUNT, updateCount); @@ -168,14 +171,18 @@ public class JdbcProducer extends DefaultProducer { setGeneratedKeys(exchange, ps.getGeneratedKeys()); } } finally { - closeQuietly(rs); - closeQuietly(ps); + if (shouldCloseResources) { + closeQuietly(rs); + closeQuietly(ps); + } } + return shouldCloseResources; } - private void doCreateAndExecuteSqlStatement(Exchange exchange, String sql, Connection conn) throws Exception { + private boolean doCreateAndExecuteSqlStatement(Exchange exchange, String sql, Connection conn) throws Exception { Statement stmt = null; ResultSet rs = null; + boolean shouldCloseResources = true; try { stmt = conn.createStatement(); @@ -186,8 +193,7 @@ public class JdbcProducer extends DefaultProducer { LOG.debug("Executing JDBC Statement: {}", sql); - Boolean shouldRetrieveGeneratedKeys = - exchange.getIn().getHeader(JdbcConstants.JDBC_RETRIEVE_GENERATED_KEYS, false, Boolean.class); + Boolean shouldRetrieveGeneratedKeys = exchange.getIn().getHeader(JdbcConstants.JDBC_RETRIEVE_GENERATED_KEYS, false, Boolean.class); boolean stmtExecutionResult; if (shouldRetrieveGeneratedKeys) { @@ -200,8 +206,7 @@ public class JdbcProducer extends DefaultProducer { stmtExecutionResult = stmt.execute(sql, (int[]) expectedGeneratedColumns); } else { throw new IllegalArgumentException( - "Header specifying expected returning columns isn't an instance of String[] or int[] but " - + expectedGeneratedColumns.getClass()); + "Header specifying expected returning columns isn't an instance of String[] or int[] but " + expectedGeneratedColumns.getClass()); } } else { stmtExecutionResult = stmt.execute(sql); @@ -210,6 +215,7 @@ public class JdbcProducer extends DefaultProducer { if (stmtExecutionResult) { rs = stmt.getResultSet(); setResultSet(exchange, rs); + shouldCloseResources = false; } else { int updateCount = stmt.getUpdateCount(); exchange.getOut().setHeader(JdbcConstants.JDBC_UPDATE_COUNT, updateCount); @@ -219,9 +225,12 @@ public class JdbcProducer extends DefaultProducer { setGeneratedKeys(exchange, stmt.getGeneratedKeys()); } } finally { - closeQuietly(rs); - closeQuietly(stmt); + if (shouldCloseResources) { + closeQuietly(rs); + closeQuietly(stmt); + } } + return shouldCloseResources; } private void closeQuietly(ResultSet rs) { @@ -264,18 +273,18 @@ public class JdbcProducer extends DefaultProducer { } } - /** * Sets the generated if any to the Exchange in headers : * - {@link JdbcConstants#JDBC_GENERATED_KEYS_ROW_COUNT} : the row count of generated keys * - {@link JdbcConstants#JDBC_GENERATED_KEYS_DATA} : the generated keys data * - * @param exchange The exchange where to store the generated keys + * @param exchange The exchange where to store the generated keys * @param generatedKeys The result set containing the generated keys */ protected void setGeneratedKeys(Exchange exchange, ResultSet generatedKeys) throws SQLException { if (generatedKeys != null) { - List<Map<String, Object>> data = extractResultSetData(generatedKeys); + ResultSetIterator iterator = new ResultSetIterator(generatedKeys, getEndpoint().isUseJDBC4ColumnNameAndLabelSemantics()); + List<Map<String, Object>> data = extractRows(iterator); exchange.getOut().setHeader(JdbcConstants.JDBC_GENERATED_KEYS_ROW_COUNT, data.size()); exchange.getOut().setHeader(JdbcConstants.JDBC_GENERATED_KEYS_DATA, data); @@ -286,112 +295,97 @@ public class JdbcProducer extends DefaultProducer { * Sets the result from the ResultSet to the Exchange as its OUT body. */ protected void setResultSet(Exchange exchange, ResultSet rs) throws SQLException { - JdbcOutputType outputType = getEndpoint().getOutputType(); + ResultSetIterator iterator = new ResultSetIterator(rs, getEndpoint().isUseJDBC4ColumnNameAndLabelSemantics()); - if (outputType == JdbcOutputType.SelectList) { - List<Map<String, Object>> data = extractResultSetData(rs); - exchange.getOut().setHeader(JdbcConstants.JDBC_ROW_COUNT, data.size()); - if (!data.isEmpty()) { - exchange.getOut().setHeader(JdbcConstants.JDBC_COLUMN_NAMES, data.get(0).keySet()); - } - exchange.getOut().setBody(data); + JdbcOutputType outputType = getEndpoint().getOutputType(); + exchange.getOut().setHeader(JdbcConstants.JDBC_COLUMN_NAMES, iterator.getColumnNames()); + if (outputType == JdbcOutputType.StreamList) { + exchange.getOut().setBody(iterator); + exchange.addOnCompletion(new ResultSetIteratorCompletion(iterator)); + } else if (outputType == JdbcOutputType.SelectList) { + List<Map<String, Object>> list = extractRows(iterator); + exchange.getOut().setHeader(JdbcConstants.JDBC_ROW_COUNT, list.size()); + exchange.getOut().setBody(list); } else if (outputType == JdbcOutputType.SelectOne) { - Object obj = queryForObject(rs); - exchange.getOut().setBody(obj); + exchange.getOut().setBody(extractSingleRow(iterator)); } } - /** - * Extract the result from the ResultSet - * - * @param rs rs produced by the SQL request - * @return All the resulting rows containing each field of the ResultSet - */ - protected List<Map<String, Object>> extractResultSetData(ResultSet rs) throws SQLException { - ResultSetMetaData meta = rs.getMetaData(); - - // should we use jdbc4 or jdbc3 semantics - boolean jdbc4 = getEndpoint().isUseJDBC4ColumnNameAndLabelSemantics(); - - int count = meta.getColumnCount(); - List<Map<String, Object>> data = new ArrayList<Map<String, Object>>(); - int rowNumber = 0; - while (rs.next() && (readSize == 0 || rowNumber < readSize)) { - Map<String, Object> row = new LinkedHashMap<String, Object>(); - for (int i = 0; i < count; i++) { - int columnNumber = i + 1; - // use column label to get the name as it also handled SQL SELECT aliases - String columnName; - if (jdbc4) { - // jdbc 4 should use label to get the name - columnName = meta.getColumnLabel(columnNumber); - } else { - // jdbc 3 uses the label or name to get the name - try { - columnName = meta.getColumnLabel(columnNumber); - } catch (SQLException e) { - columnName = meta.getColumnName(columnNumber); - } - } - // use index based which should be faster - int columnType = meta.getColumnType(columnNumber); - if (columnType == Types.CLOB || columnType == Types.BLOB) { - row.put(columnName, rs.getString(columnNumber)); - } else { - row.put(columnName, rs.getObject(columnNumber)); - } + private List<Map<String, Object>> extractRows(ResultSetIterator iterator) { + try { + List<Map<String, Object>> result = new ArrayList<Map<String, Object>>(); + int maxRowCount = readSize == 0 ? Integer.MAX_VALUE : readSize; + for (int i = 0; iterator.hasNext() && i < maxRowCount; i++) { + result.add(iterator.next()); } - data.add(row); - rowNumber++; + return result; + } finally { + iterator.close(); } - return data; } + private Object extractSingleRow(ResultSetIterator iterator) throws SQLException { + try { + if (!iterator.hasNext()) { + return null; + } - @SuppressWarnings("unchecked") - protected Object queryForObject(ResultSet rs) throws SQLException { - Object result = null; - List<Map<String, Object>> data = extractResultSetData(rs); - if (data.size() > 1) { - throw new SQLDataException("Query result not unique for outputType=SelectOne. Got " + data.size() + " count instead."); - } else if (data.size() == 1) { - if (getEndpoint().getOutputClass() == null) { - // Set content depend on number of column from query result - Map<String, Object> row = data.get(0); - if (row.size() == 1) { - result = row.values().iterator().next(); - } else { - result = row; - } + Map<String, Object> row = iterator.next(); + if (iterator.hasNext()) { + throw new SQLDataException("Query result not unique for outputType=SelectOne."); + } else if (getEndpoint().getOutputClass() != null) { + return newBeanInstance(row); + } else if (row.size() == 1) { + return row.values().iterator().next(); } else { - Class<?> outputClzz = getEndpoint().getCamelContext().getClassResolver().resolveClass(getEndpoint().getOutputClass()); - Object answer = getEndpoint().getCamelContext().getInjector().newInstance(outputClzz); + return row; + } + } finally { + iterator.close(); + } + } - Map<String, Object> row = data.get(0); - Map<String, Object> properties = new LinkedHashMap<String, Object>(data.size()); + private Object newBeanInstance(Map<String, Object> row) throws SQLException { + Class<?> outputClass = getEndpoint().getCamelContext().getClassResolver().resolveClass(getEndpoint().getOutputClass()); + Object answer = getEndpoint().getCamelContext().getInjector().newInstance(outputClass); - // map row names using the bean row mapper - for (Map.Entry<String, Object> entry : row.entrySet()) { - Object value = entry.getValue(); - String name = getEndpoint().getBeanRowMapper().map(entry.getKey(), value); - properties.put(name, value); - } - try { - IntrospectionSupport.setProperties(answer, properties); - } catch (Exception e) { - throw new SQLException("Error setting properties on output class " + outputClzz, e); - } + Map<String, Object> properties = new LinkedHashMap<String, Object>(); - // check we could map all properties to the bean - if (!properties.isEmpty()) { - throw new IllegalArgumentException("Cannot map all properties to bean of type " + outputClzz + ". There are " + properties.size() + " unmapped properties. " + properties); - } - return answer; - } + // map row names using the bean row mapper + for (Map.Entry<String, Object> entry : row.entrySet()) { + Object value = entry.getValue(); + String name = getEndpoint().getBeanRowMapper().map(entry.getKey(), value); + properties.put(name, value); + } + try { + IntrospectionSupport.setProperties(answer, properties); + } catch (Exception e) { + throw new SQLException("Error setting properties on output class " + outputClass, e); } - // If data.size is zero, let result be null. - return result; + // check we could map all properties to the bean + if (!properties.isEmpty()) { + throw new IllegalArgumentException( + "Cannot map all properties to bean of type " + outputClass + ". There are " + properties.size() + " unmapped properties. " + properties); + } + return answer; } + private static final class ResultSetIteratorCompletion implements Synchronization { + private final ResultSetIterator iterator; + + private ResultSetIteratorCompletion(ResultSetIterator iterator) { + this.iterator = iterator; + } + + @Override + public void onComplete(Exchange exchange) { + iterator.close(); + } + + @Override + public void onFailure(Exchange exchange) { + iterator.close(); + } + } } http://git-wip-us.apache.org/repos/asf/camel/blob/16c1d36d/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/ResultSetIterator.java ---------------------------------------------------------------------- diff --git a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/ResultSetIterator.java b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/ResultSetIterator.java new file mode 100644 index 0000000..f471124 --- /dev/null +++ b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/ResultSetIterator.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jdbc; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Types; +import java.util.*; +import java.util.concurrent.atomic.*; +import org.apache.camel.RuntimeCamelException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ResultSetIterator implements Iterator<Map<String, Object>> { + private static final Logger LOG = LoggerFactory.getLogger(ResultSetIterator.class); + + private final Connection connection; + private final Statement statement; + private final ResultSet resultSet; + private final Column[] columns; + private final AtomicBoolean closed = new AtomicBoolean(); + + public ResultSetIterator(ResultSet resultSet, boolean isJDBC4) throws SQLException { + this.resultSet = resultSet; + this.statement = this.resultSet.getStatement(); + this.connection = this.statement.getConnection(); + + ResultSetMetaData metaData = resultSet.getMetaData(); + columns = new Column[metaData.getColumnCount()]; + for (int i = 0; i < columns.length; i++) { + int columnNumber = i + 1; + String columnName = getColumnName(metaData, columnNumber, isJDBC4); + int columnType = metaData.getColumnType(columnNumber); + if (columnType == Types.CLOB || columnType == Types.BLOB) { + columns[i] = new BlobColumn(columnName, columnNumber); + } else { + columns[i] = new DefaultColumn(columnName, columnNumber); + } + } + + loadNext(); + } + + @Override + public boolean hasNext() { + return !closed.get(); + } + + @Override + public Map<String, Object> next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + try { + Map<String, Object> row = new LinkedHashMap<String, Object>(); + for (Column column : columns) { + row.put(column.getName(), column.getValue(resultSet)); + } + loadNext(); + return row; + } catch (SQLException e) { + close(); + throw new RuntimeCamelException("Cannot process result", e); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Cannot remove from a database result"); + } + + public Set<String> getColumnNames() { + // New copy each time in order to ensure immutability + Set<String> columnNames = new HashSet<String>(columns.length); + for (Column column : columns) { + columnNames.add(column.getName()); + } + return columnNames; + } + + public void close() { + if (closed.compareAndSet(false, true)) { + safeCloseResultSet(); + safeCloseStatement(); + safeCloseConnection(); + } + } + + private void loadNext() throws SQLException { + boolean hasNext = resultSet.next(); + if (!hasNext) { + close(); + } + } + + private void safeCloseResultSet() { + try { + resultSet.close(); + } catch (SQLException e) { + LOG.warn("Error by closing result set: " + e, e); + } + } + + private void safeCloseStatement() { + try { + statement.close(); + } catch (SQLException e) { + LOG.warn("Error by closing statement: " + e, e); + } + } + + private void safeCloseConnection() { + try { + connection.close(); + } catch (SQLException e) { + LOG.warn("Error by closing connection: " + e, e); + } + } + + private static String getColumnName(ResultSetMetaData metaData, int columnNumber, boolean isJDBC4) throws SQLException { + if (isJDBC4) { + // jdbc 4 should use label to get the name + return metaData.getColumnLabel(columnNumber); + } else { + // jdbc 3 uses the label or name to get the name + try { + return metaData.getColumnLabel(columnNumber); + } catch (SQLException e) { + return metaData.getColumnName(columnNumber); + } + } + } + + private static interface Column { + String getName(); + + Object getValue(ResultSet resultSet) throws SQLException; + } + + private static class DefaultColumn implements Column { + private final String name; + protected final int columnNumber; + + private DefaultColumn(String name, int columnNumber) { + this.name = name; + this.columnNumber = columnNumber; + } + + @Override + public String getName() { + return name; + } + + @Override + public Object getValue(ResultSet resultSet) throws SQLException { + return resultSet.getObject(columnNumber); + } + } + + private static final class BlobColumn extends DefaultColumn { + private BlobColumn(String name, int columnNumber) { + super(name, columnNumber); + } + + @Override + public Object getValue(ResultSet resultSet) throws SQLException { + return resultSet.getString(columnNumber); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/16c1d36d/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcProducerOutputTypeStreamListTest.java ---------------------------------------------------------------------- diff --git a/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcProducerOutputTypeStreamListTest.java b/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcProducerOutputTypeStreamListTest.java new file mode 100644 index 0000000..d8c0ed5 --- /dev/null +++ b/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcProducerOutputTypeStreamListTest.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jdbc; + +import java.util.*; +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.instanceOf; + +public class JdbcProducerOutputTypeStreamListTest extends AbstractJdbcTestSupport { + private static final String QUERY = "select * from customer"; + + @EndpointInject(uri = "mock:result") + private MockEndpoint result; + + @Test + public void shouldReturnAnIterator() throws Exception { + result.expectedMessageCount(1); + + template.sendBody("direct:start", QUERY); + + result.assertIsSatisfied(); + assertThat(resultBodyAt(0), instanceOf(Iterator.class)); + } + + @Test + public void shouldStreamResultRows() throws Exception { + result.expectedMessageCount(3); + + template.sendBody("direct:withSplit", QUERY); + + result.assertIsSatisfied(); + assertThat(resultBodyAt(0), instanceOf(Map.class)); + assertThat(resultBodyAt(1), instanceOf(Map.class)); + assertThat(resultBodyAt(2), instanceOf(Map.class)); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + from("direct:start").to("jdbc:testdb?outputType=StreamList").to("mock:result"); + from("direct:withSplit").to("jdbc:testdb?outputType=StreamList").split(body()).to("mock:result"); + } + }; + } + + private Object resultBodyAt(int index) { + return result.assertExchangeReceived(index).getIn().getBody(); + } +}