Repository: camel
Updated Branches:
  refs/heads/master 9e0a204e2 -> 1fb191c91


CAMEL-7447 Allow to stream the result of a database query


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/16c1d36d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/16c1d36d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/16c1d36d

Branch: refs/heads/master
Commit: 16c1d36d848c3ae02507babf8aae7a8dac9ebd83
Parents: c160402
Author: Antoine DESSAIGNE <antoine.dessai...@gmail.com>
Authored: Thu May 22 11:53:54 2014 +0200
Committer: Antoine DESSAIGNE <antoine.dessai...@gmail.com>
Committed: Thu May 22 11:53:54 2014 +0200

----------------------------------------------------------------------
 .../camel/component/jdbc/JdbcOutputType.java    |   2 +-
 .../camel/component/jdbc/JdbcProducer.java      | 244 +++++++++----------
 .../camel/component/jdbc/ResultSetIterator.java | 188 ++++++++++++++
 .../JdbcProducerOutputTypeStreamListTest.java   |  68 ++++++
 4 files changed, 376 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/16c1d36d/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcOutputType.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcOutputType.java
 
b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcOutputType.java
index 183735f..30ef9df 100644
--- 
a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcOutputType.java
+++ 
b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcOutputType.java
@@ -17,5 +17,5 @@
 package org.apache.camel.component.jdbc;
 
 public enum JdbcOutputType {
-    SelectOne, SelectList
+    SelectOne, SelectList, StreamList
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/16c1d36d/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java
 
b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java
index 15850e8..9b8fad0 100644
--- 
a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java
+++ 
b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java
@@ -19,20 +19,14 @@ package org.apache.camel.component.jdbc;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
 import java.sql.SQLDataException;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.sql.Types;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import javax.sql.DataSource;
-
+import java.util.*;
+import javax.sql.*;
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.spi.Synchronization;
 import org.apache.camel.util.IntrospectionSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -75,6 +69,8 @@ public class JdbcProducer extends DefaultProducer {
         String sql = exchange.getIn().getBody(String.class);
         Connection conn = null;
         Boolean autoCommit = null;
+        boolean shouldCloseResources = true;
+
         try {
             conn = dataSource.getConnection();
             autoCommit = conn.getAutoCommit();
@@ -82,7 +78,7 @@ public class JdbcProducer extends DefaultProducer {
                 conn.setAutoCommit(false);
             }
 
-            createAndExecuteSqlStatement(exchange, sql, conn);
+            shouldCloseResources = createAndExecuteSqlStatement(exchange, sql, 
conn);
 
             conn.commit();
         } catch (Exception e) {
@@ -95,39 +91,45 @@ public class JdbcProducer extends DefaultProducer {
             }
             throw e;
         } finally {
-            resetAutoCommit(conn, autoCommit);
-            closeQuietly(conn);
+            if (shouldCloseResources) {
+                resetAutoCommit(conn, autoCommit);
+                closeQuietly(conn);
+            }
         }
     }
 
     private void processingSqlWithoutSettingAutoCommit(Exchange exchange) 
throws Exception {
         String sql = exchange.getIn().getBody(String.class);
         Connection conn = null;
+        boolean shouldCloseResources = true;
+
         try {
             conn = dataSource.getConnection();
-            createAndExecuteSqlStatement(exchange, sql, conn);
+            shouldCloseResources = createAndExecuteSqlStatement(exchange, sql, 
conn);
         } finally {
-            closeQuietly(conn);
+            if (shouldCloseResources) {
+                closeQuietly(conn);
+            }
         }
     }
 
-    private void createAndExecuteSqlStatement(Exchange exchange, String sql, 
Connection conn) throws Exception {
+    private boolean createAndExecuteSqlStatement(Exchange exchange, String 
sql, Connection conn) throws Exception {
         if (getEndpoint().isUseHeadersAsParameters()) {
-            doCreateAndExecuteSqlStatementWithHeaders(exchange, sql, conn);
+            return doCreateAndExecuteSqlStatementWithHeaders(exchange, sql, 
conn);
         } else {
-            doCreateAndExecuteSqlStatement(exchange, sql, conn);
+            return doCreateAndExecuteSqlStatement(exchange, sql, conn);
         }
     }
 
-    private void doCreateAndExecuteSqlStatementWithHeaders(Exchange exchange, 
String sql, Connection conn) throws Exception {
+    private boolean doCreateAndExecuteSqlStatementWithHeaders(Exchange 
exchange, String sql, Connection conn) throws Exception {
         PreparedStatement ps = null;
         ResultSet rs = null;
+        boolean shouldCloseResources = true;
 
         try {
             final String preparedQuery = 
getEndpoint().getPrepareStatementStrategy().prepareQuery(sql, 
getEndpoint().isAllowNamedParameters());
 
-            Boolean shouldRetrieveGeneratedKeys =
-                    
exchange.getIn().getHeader(JdbcConstants.JDBC_RETRIEVE_GENERATED_KEYS, false, 
Boolean.class);
+            Boolean shouldRetrieveGeneratedKeys = 
exchange.getIn().getHeader(JdbcConstants.JDBC_RETRIEVE_GENERATED_KEYS, false, 
Boolean.class);
 
             if (shouldRetrieveGeneratedKeys) {
                 Object expectedGeneratedColumns = 
exchange.getIn().getHeader(JdbcConstants.JDBC_GENERATED_COLUMNS);
@@ -139,8 +141,7 @@ public class JdbcProducer extends DefaultProducer {
                     ps = conn.prepareStatement(preparedQuery, (int[]) 
expectedGeneratedColumns);
                 } else {
                     throw new IllegalArgumentException(
-                            "Header specifying expected returning columns 
isn't an instance of String[] or int[] but "
-                                    + expectedGeneratedColumns.getClass());
+                            "Header specifying expected returning columns 
isn't an instance of String[] or int[] but " + 
expectedGeneratedColumns.getClass());
                 }
             } else {
                 ps = conn.prepareStatement(preparedQuery);
@@ -149,7 +150,8 @@ public class JdbcProducer extends DefaultProducer {
             int expectedCount = ps.getParameterMetaData().getParameterCount();
 
             if (expectedCount > 0) {
-                Iterator<?> it = 
getEndpoint().getPrepareStatementStrategy().createPopulateIterator(sql, 
preparedQuery, expectedCount, exchange, exchange.getIn().getBody());
+                Iterator<?> it = getEndpoint().getPrepareStatementStrategy()
+                        .createPopulateIterator(sql, preparedQuery, 
expectedCount, exchange, exchange.getIn().getBody());
                 
getEndpoint().getPrepareStatementStrategy().populateStatement(ps, it, 
expectedCount);
             }
 
@@ -159,6 +161,7 @@ public class JdbcProducer extends DefaultProducer {
             if (stmtExecutionResult) {
                 rs = ps.getResultSet();
                 setResultSet(exchange, rs);
+                shouldCloseResources = false;
             } else {
                 int updateCount = ps.getUpdateCount();
                 exchange.getOut().setHeader(JdbcConstants.JDBC_UPDATE_COUNT, 
updateCount);
@@ -168,14 +171,18 @@ public class JdbcProducer extends DefaultProducer {
                 setGeneratedKeys(exchange, ps.getGeneratedKeys());
             }
         } finally {
-            closeQuietly(rs);
-            closeQuietly(ps);
+            if (shouldCloseResources) {
+                closeQuietly(rs);
+                closeQuietly(ps);
+            }
         }
+        return shouldCloseResources;
     }
 
-    private void doCreateAndExecuteSqlStatement(Exchange exchange, String sql, 
Connection conn) throws Exception {
+    private boolean doCreateAndExecuteSqlStatement(Exchange exchange, String 
sql, Connection conn) throws Exception {
         Statement stmt = null;
         ResultSet rs = null;
+        boolean shouldCloseResources = true;
 
         try {
             stmt = conn.createStatement();
@@ -186,8 +193,7 @@ public class JdbcProducer extends DefaultProducer {
 
             LOG.debug("Executing JDBC Statement: {}", sql);
 
-            Boolean shouldRetrieveGeneratedKeys =
-                    
exchange.getIn().getHeader(JdbcConstants.JDBC_RETRIEVE_GENERATED_KEYS, false, 
Boolean.class);
+            Boolean shouldRetrieveGeneratedKeys = 
exchange.getIn().getHeader(JdbcConstants.JDBC_RETRIEVE_GENERATED_KEYS, false, 
Boolean.class);
 
             boolean stmtExecutionResult;
             if (shouldRetrieveGeneratedKeys) {
@@ -200,8 +206,7 @@ public class JdbcProducer extends DefaultProducer {
                     stmtExecutionResult = stmt.execute(sql, (int[]) 
expectedGeneratedColumns);
                 } else {
                     throw new IllegalArgumentException(
-                            "Header specifying expected returning columns 
isn't an instance of String[] or int[] but "
-                                    + expectedGeneratedColumns.getClass());
+                            "Header specifying expected returning columns 
isn't an instance of String[] or int[] but " + 
expectedGeneratedColumns.getClass());
                 }
             } else {
                 stmtExecutionResult = stmt.execute(sql);
@@ -210,6 +215,7 @@ public class JdbcProducer extends DefaultProducer {
             if (stmtExecutionResult) {
                 rs = stmt.getResultSet();
                 setResultSet(exchange, rs);
+                shouldCloseResources = false;
             } else {
                 int updateCount = stmt.getUpdateCount();
                 exchange.getOut().setHeader(JdbcConstants.JDBC_UPDATE_COUNT, 
updateCount);
@@ -219,9 +225,12 @@ public class JdbcProducer extends DefaultProducer {
                 setGeneratedKeys(exchange, stmt.getGeneratedKeys());
             }
         } finally {
-            closeQuietly(rs);
-            closeQuietly(stmt);
+            if (shouldCloseResources) {
+                closeQuietly(rs);
+                closeQuietly(stmt);
+            }
         }
+        return shouldCloseResources;
     }
 
     private void closeQuietly(ResultSet rs) {
@@ -264,18 +273,18 @@ public class JdbcProducer extends DefaultProducer {
         }
     }
 
-
     /**
      * Sets the generated if any to the Exchange in headers :
      * - {@link JdbcConstants#JDBC_GENERATED_KEYS_ROW_COUNT} : the row count 
of generated keys
      * - {@link JdbcConstants#JDBC_GENERATED_KEYS_DATA} : the generated keys 
data
      *
-     * @param exchange      The exchange where to store the generated keys
+     * @param exchange The exchange where to store the generated keys
      * @param generatedKeys The result set containing the generated keys
      */
     protected void setGeneratedKeys(Exchange exchange, ResultSet 
generatedKeys) throws SQLException {
         if (generatedKeys != null) {
-            List<Map<String, Object>> data = 
extractResultSetData(generatedKeys);
+            ResultSetIterator iterator = new ResultSetIterator(generatedKeys, 
getEndpoint().isUseJDBC4ColumnNameAndLabelSemantics());
+            List<Map<String, Object>> data = extractRows(iterator);
 
             
exchange.getOut().setHeader(JdbcConstants.JDBC_GENERATED_KEYS_ROW_COUNT, 
data.size());
             
exchange.getOut().setHeader(JdbcConstants.JDBC_GENERATED_KEYS_DATA, data);
@@ -286,112 +295,97 @@ public class JdbcProducer extends DefaultProducer {
      * Sets the result from the ResultSet to the Exchange as its OUT body.
      */
     protected void setResultSet(Exchange exchange, ResultSet rs) throws 
SQLException {
-        JdbcOutputType outputType = getEndpoint().getOutputType();
+        ResultSetIterator iterator = new ResultSetIterator(rs, 
getEndpoint().isUseJDBC4ColumnNameAndLabelSemantics());
 
-        if (outputType == JdbcOutputType.SelectList) {
-            List<Map<String, Object>> data = extractResultSetData(rs);
-            exchange.getOut().setHeader(JdbcConstants.JDBC_ROW_COUNT, 
data.size());
-            if (!data.isEmpty()) {
-                exchange.getOut().setHeader(JdbcConstants.JDBC_COLUMN_NAMES, 
data.get(0).keySet());
-            }
-            exchange.getOut().setBody(data);
+        JdbcOutputType outputType = getEndpoint().getOutputType();
+        exchange.getOut().setHeader(JdbcConstants.JDBC_COLUMN_NAMES, 
iterator.getColumnNames());
+        if (outputType == JdbcOutputType.StreamList) {
+            exchange.getOut().setBody(iterator);
+            exchange.addOnCompletion(new 
ResultSetIteratorCompletion(iterator));
+        } else if (outputType == JdbcOutputType.SelectList) {
+            List<Map<String, Object>> list = extractRows(iterator);
+            exchange.getOut().setHeader(JdbcConstants.JDBC_ROW_COUNT, 
list.size());
+            exchange.getOut().setBody(list);
         } else if (outputType == JdbcOutputType.SelectOne) {
-            Object obj = queryForObject(rs);
-            exchange.getOut().setBody(obj);
+            exchange.getOut().setBody(extractSingleRow(iterator));
         }
     }
 
-    /**
-     * Extract the result from the ResultSet
-     *
-     * @param rs rs produced by the SQL request
-     * @return All the resulting rows containing each field of the ResultSet
-     */
-    protected List<Map<String, Object>> extractResultSetData(ResultSet rs) 
throws SQLException {
-        ResultSetMetaData meta = rs.getMetaData();
-
-        // should we use jdbc4 or jdbc3 semantics
-        boolean jdbc4 = getEndpoint().isUseJDBC4ColumnNameAndLabelSemantics();
-
-        int count = meta.getColumnCount();
-        List<Map<String, Object>> data = new ArrayList<Map<String, Object>>();
-        int rowNumber = 0;
-        while (rs.next() && (readSize == 0 || rowNumber < readSize)) {
-            Map<String, Object> row = new LinkedHashMap<String, Object>();
-            for (int i = 0; i < count; i++) {
-                int columnNumber = i + 1;
-                // use column label to get the name as it also handled SQL 
SELECT aliases
-                String columnName;
-                if (jdbc4) {
-                    // jdbc 4 should use label to get the name
-                    columnName = meta.getColumnLabel(columnNumber);
-                } else {
-                    // jdbc 3 uses the label or name to get the name
-                    try {
-                        columnName = meta.getColumnLabel(columnNumber);
-                    } catch (SQLException e) {
-                        columnName = meta.getColumnName(columnNumber);
-                    }
-                }
-                // use index based which should be faster
-                int columnType = meta.getColumnType(columnNumber);
-                if (columnType == Types.CLOB || columnType == Types.BLOB) {
-                    row.put(columnName, rs.getString(columnNumber));
-                } else {
-                    row.put(columnName, rs.getObject(columnNumber));
-                }
+    private List<Map<String, Object>> extractRows(ResultSetIterator iterator) {
+        try {
+            List<Map<String, Object>> result = new ArrayList<Map<String, 
Object>>();
+            int maxRowCount = readSize == 0 ? Integer.MAX_VALUE : readSize;
+            for (int i = 0; iterator.hasNext() && i < maxRowCount; i++) {
+                result.add(iterator.next());
             }
-            data.add(row);
-            rowNumber++;
+            return result;
+        } finally {
+            iterator.close();
         }
-        return data;
     }
 
+    private Object extractSingleRow(ResultSetIterator iterator) throws 
SQLException {
+        try {
+            if (!iterator.hasNext()) {
+                return null;
+            }
 
-    @SuppressWarnings("unchecked")
-    protected Object queryForObject(ResultSet rs) throws SQLException {
-        Object result = null;
-        List<Map<String, Object>> data = extractResultSetData(rs);
-        if (data.size() > 1) {
-            throw new SQLDataException("Query result not unique for 
outputType=SelectOne. Got " + data.size() + " count instead.");
-        } else if (data.size() == 1) {
-            if (getEndpoint().getOutputClass() == null) {
-                // Set content depend on number of column from query result
-                Map<String, Object> row = data.get(0);
-                if (row.size() == 1) {
-                    result = row.values().iterator().next();
-                } else {
-                    result = row;
-                }
+            Map<String, Object> row = iterator.next();
+            if (iterator.hasNext()) {
+                throw new SQLDataException("Query result not unique for 
outputType=SelectOne.");
+            } else if (getEndpoint().getOutputClass() != null) {
+                return newBeanInstance(row);
+            } else if (row.size() == 1) {
+                return row.values().iterator().next();
             } else {
-                Class<?> outputClzz = 
getEndpoint().getCamelContext().getClassResolver().resolveClass(getEndpoint().getOutputClass());
-                Object answer = 
getEndpoint().getCamelContext().getInjector().newInstance(outputClzz);
+                return row;
+            }
+        } finally {
+            iterator.close();
+        }
+    }
 
-                Map<String, Object> row = data.get(0);
-                Map<String, Object> properties = new LinkedHashMap<String, 
Object>(data.size());
+    private Object newBeanInstance(Map<String, Object> row) throws 
SQLException {
+        Class<?> outputClass = 
getEndpoint().getCamelContext().getClassResolver().resolveClass(getEndpoint().getOutputClass());
+        Object answer = 
getEndpoint().getCamelContext().getInjector().newInstance(outputClass);
 
-                // map row names using the bean row mapper
-                for (Map.Entry<String, Object> entry : row.entrySet()) {
-                    Object value = entry.getValue();
-                    String name = 
getEndpoint().getBeanRowMapper().map(entry.getKey(), value);
-                    properties.put(name, value);
-                }
-                try {
-                    IntrospectionSupport.setProperties(answer, properties);
-                } catch (Exception e) {
-                    throw new SQLException("Error setting properties on output 
class " + outputClzz, e);
-                }
+        Map<String, Object> properties = new LinkedHashMap<String, Object>();
 
-                // check we could map all properties to the bean
-                if (!properties.isEmpty()) {
-                    throw new IllegalArgumentException("Cannot map all 
properties to bean of type " + outputClzz + ". There are " + properties.size() 
+ " unmapped properties. " + properties);
-                }
-                return answer;
-            }
+        // map row names using the bean row mapper
+        for (Map.Entry<String, Object> entry : row.entrySet()) {
+            Object value = entry.getValue();
+            String name = getEndpoint().getBeanRowMapper().map(entry.getKey(), 
value);
+            properties.put(name, value);
+        }
+        try {
+            IntrospectionSupport.setProperties(answer, properties);
+        } catch (Exception e) {
+            throw new SQLException("Error setting properties on output class " 
+ outputClass, e);
         }
 
-        // If data.size is zero, let result be null.
-        return result;
+        // check we could map all properties to the bean
+        if (!properties.isEmpty()) {
+            throw new IllegalArgumentException(
+                    "Cannot map all properties to bean of type " + outputClass 
+ ". There are " + properties.size() + " unmapped properties. " + properties);
+        }
+        return answer;
     }
 
+    private static final class ResultSetIteratorCompletion implements 
Synchronization {
+        private final ResultSetIterator iterator;
+
+        private ResultSetIteratorCompletion(ResultSetIterator iterator) {
+            this.iterator = iterator;
+        }
+
+        @Override
+        public void onComplete(Exchange exchange) {
+            iterator.close();
+        }
+
+        @Override
+        public void onFailure(Exchange exchange) {
+            iterator.close();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/16c1d36d/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/ResultSetIterator.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/ResultSetIterator.java
 
b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/ResultSetIterator.java
new file mode 100644
index 0000000..f471124
--- /dev/null
+++ 
b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/ResultSetIterator.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jdbc;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+import org.apache.camel.RuntimeCamelException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ResultSetIterator implements Iterator<Map<String, Object>> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ResultSetIterator.class);
+
+    private final Connection connection;
+    private final Statement statement;
+    private final ResultSet resultSet;
+    private final Column[] columns;
+    private final AtomicBoolean closed = new AtomicBoolean();
+
+    public ResultSetIterator(ResultSet resultSet, boolean isJDBC4) throws 
SQLException {
+        this.resultSet = resultSet;
+        this.statement = this.resultSet.getStatement();
+        this.connection = this.statement.getConnection();
+
+        ResultSetMetaData metaData = resultSet.getMetaData();
+        columns = new Column[metaData.getColumnCount()];
+        for (int i = 0; i < columns.length; i++) {
+            int columnNumber = i + 1;
+            String columnName = getColumnName(metaData, columnNumber, isJDBC4);
+            int columnType = metaData.getColumnType(columnNumber);
+            if (columnType == Types.CLOB || columnType == Types.BLOB) {
+                columns[i] = new BlobColumn(columnName, columnNumber);
+            } else {
+                columns[i] = new DefaultColumn(columnName, columnNumber);
+            }
+        }
+
+        loadNext();
+    }
+
+    @Override
+    public boolean hasNext() {
+        return !closed.get();
+    }
+
+    @Override
+    public Map<String, Object> next() {
+        if (!hasNext()) {
+            throw new NoSuchElementException();
+        }
+
+        try {
+            Map<String, Object> row = new LinkedHashMap<String, Object>();
+            for (Column column : columns) {
+                row.put(column.getName(), column.getValue(resultSet));
+            }
+            loadNext();
+            return row;
+        } catch (SQLException e) {
+            close();
+            throw new RuntimeCamelException("Cannot process result", e);
+        }
+    }
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException("Cannot remove from a database 
result");
+    }
+
+    public Set<String> getColumnNames() {
+        // New copy each time in order to ensure immutability
+        Set<String> columnNames = new HashSet<String>(columns.length);
+        for (Column column : columns) {
+            columnNames.add(column.getName());
+        }
+        return columnNames;
+    }
+
+    public void close() {
+        if (closed.compareAndSet(false, true)) {
+            safeCloseResultSet();
+            safeCloseStatement();
+            safeCloseConnection();
+        }
+    }
+
+    private void loadNext() throws SQLException {
+        boolean hasNext = resultSet.next();
+        if (!hasNext) {
+            close();
+        }
+    }
+
+    private void safeCloseResultSet() {
+        try {
+            resultSet.close();
+        } catch (SQLException e) {
+            LOG.warn("Error by closing result set: " + e, e);
+        }
+    }
+
+    private void safeCloseStatement() {
+        try {
+            statement.close();
+        } catch (SQLException e) {
+            LOG.warn("Error by closing statement: " + e, e);
+        }
+    }
+
+    private void safeCloseConnection() {
+        try {
+            connection.close();
+        } catch (SQLException e) {
+            LOG.warn("Error by closing connection: " + e, e);
+        }
+    }
+
+    private static String getColumnName(ResultSetMetaData metaData, int 
columnNumber, boolean isJDBC4) throws SQLException {
+        if (isJDBC4) {
+            // jdbc 4 should use label to get the name
+            return metaData.getColumnLabel(columnNumber);
+        } else {
+            // jdbc 3 uses the label or name to get the name
+            try {
+                return metaData.getColumnLabel(columnNumber);
+            } catch (SQLException e) {
+                return metaData.getColumnName(columnNumber);
+            }
+        }
+    }
+
+    private static interface Column {
+        String getName();
+
+        Object getValue(ResultSet resultSet) throws SQLException;
+    }
+
+    private static class DefaultColumn implements Column {
+        private final String name;
+        protected final int columnNumber;
+
+        private DefaultColumn(String name, int columnNumber) {
+            this.name = name;
+            this.columnNumber = columnNumber;
+        }
+
+        @Override
+        public String getName() {
+            return name;
+        }
+
+        @Override
+        public Object getValue(ResultSet resultSet) throws SQLException {
+            return resultSet.getObject(columnNumber);
+        }
+    }
+
+    private static final class BlobColumn extends DefaultColumn {
+        private BlobColumn(String name, int columnNumber) {
+            super(name, columnNumber);
+        }
+
+        @Override
+        public Object getValue(ResultSet resultSet) throws SQLException {
+            return resultSet.getString(columnNumber);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/16c1d36d/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcProducerOutputTypeStreamListTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcProducerOutputTypeStreamListTest.java
 
b/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcProducerOutputTypeStreamListTest.java
new file mode 100644
index 0000000..d8c0ed5
--- /dev/null
+++ 
b/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcProducerOutputTypeStreamListTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jdbc;
+
+import java.util.*;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+
+public class JdbcProducerOutputTypeStreamListTest extends 
AbstractJdbcTestSupport {
+    private static final String QUERY = "select * from customer";
+
+    @EndpointInject(uri = "mock:result")
+    private MockEndpoint result;
+
+    @Test
+    public void shouldReturnAnIterator() throws Exception {
+        result.expectedMessageCount(1);
+
+        template.sendBody("direct:start", QUERY);
+
+        result.assertIsSatisfied();
+        assertThat(resultBodyAt(0), instanceOf(Iterator.class));
+    }
+
+    @Test
+    public void shouldStreamResultRows() throws Exception {
+        result.expectedMessageCount(3);
+
+        template.sendBody("direct:withSplit", QUERY);
+
+        result.assertIsSatisfied();
+        assertThat(resultBodyAt(0), instanceOf(Map.class));
+        assertThat(resultBodyAt(1), instanceOf(Map.class));
+        assertThat(resultBodyAt(2), instanceOf(Map.class));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                
from("direct:start").to("jdbc:testdb?outputType=StreamList").to("mock:result");
+                
from("direct:withSplit").to("jdbc:testdb?outputType=StreamList").split(body()).to("mock:result");
+            }
+        };
+    }
+
+    private Object resultBodyAt(int index) {
+        return result.assertExchangeReceived(index).getIn().getBody();
+    }
+}

Reply via email to