Author: romkal
Date: Mon May 12 23:18:48 2008
New Revision: 655746

URL: http://svn.apache.org/viewvc?rev=655746&view=rev
Log:
CAMEL-319: Added support for sql updates in SQL component

Modified:
    
activemq/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
    
activemq/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
    
activemq/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlRouteTest.java

Modified: 
activemq/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java?rev=655746&r1=655745&r2=655746&view=diff
==============================================================================
--- 
activemq/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
 (original)
+++ 
activemq/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
 Mon May 12 23:18:48 2008
@@ -49,13 +49,7 @@
         this.query = query;
     }
 
-    public SqlEndpoint(String endpointUri, JdbcTemplate jdbcTemplate, String 
query) {
-        super(endpointUri);
-        this.jdbcTemplate = jdbcTemplate;
-        this.query = query;
-    }
-
-    public Consumer<DefaultExchange> createConsumer(Processor arg0) throws 
Exception {
+    public Consumer<DefaultExchange> createConsumer(Processor processor) 
throws Exception {
         throw new UnsupportedOperationException("Not yet implemented");
     }
 

Modified: 
activemq/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java?rev=655746&r1=655745&r2=655746&view=diff
==============================================================================
--- 
activemq/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
 (original)
+++ 
activemq/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
 Mon May 12 23:18:48 2008
@@ -16,18 +16,25 @@
  */
 package org.apache.camel.component.sql;
 
-import java.util.ArrayList;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
 import java.util.Iterator;
 import java.util.List;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.impl.DefaultProducer;
+import org.springframework.dao.DataAccessException;
+import org.springframework.jdbc.core.ColumnMapRowMapper;
 import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.core.PreparedStatementCallback;
+import org.springframework.jdbc.core.RowMapperResultSetExtractor;
 
 public class SqlProducer extends DefaultProducer<DefaultExchange> {
 
-    private String query;
+    public static final String UPDATE_COUNT = 
"org.apache.camel.sql.update-count";
+
+       private String query;
 
     private JdbcTemplate jdbcTemplate;
 
@@ -37,14 +44,30 @@
         this.query = query;
     }
 
-    public void process(Exchange exchange) throws Exception {
-        List<Object> arguments = new ArrayList<Object>();
-        for (Iterator<?> i = exchange.getIn().getBody(Iterator.class); 
i.hasNext();) {
-            arguments.add(i.next());
-        }
-
-        List result = jdbcTemplate.queryForList(query, arguments.toArray());
-        exchange.getOut().setBody(result);
+    public void process(final Exchange exchange) throws Exception {
+        
+        
+        jdbcTemplate.execute(query, new PreparedStatementCallback() {
+               
+                       public Object doInPreparedStatement(PreparedStatement 
ps)
+                                       throws SQLException, 
DataAccessException {
+                               int argNumber = 1;
+                               for (Iterator<?> i = 
exchange.getIn().getBody(Iterator.class); i.hasNext();) {
+                           ps.setObject(argNumber++, i.next());
+                       }
+                               boolean isResultSet = ps.execute();
+                               if (isResultSet) {
+                                       RowMapperResultSetExtractor mapper = 
new RowMapperResultSetExtractor(new ColumnMapRowMapper());
+                                       List result = (List) 
mapper.extractData(ps.getResultSet());
+                                       exchange.getOut().setBody(result);
+                               } else {
+                                       
exchange.getIn().setHeader(UPDATE_COUNT, ps.getUpdateCount());
+                               }
+                               return null;
+                       }
+               
+               });
+        
     }
 
 }

Modified: 
activemq/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlRouteTest.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlRouteTest.java?rev=655746&r1=655745&r2=655746&view=diff
==============================================================================
--- 
activemq/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlRouteTest.java
 (original)
+++ 
activemq/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlRouteTest.java
 Mon May 12 23:18:48 2008
@@ -25,6 +25,7 @@
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.springframework.dao.EmptyResultDataAccessException;
 import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.jdbc.datasource.SingleConnectionDataSource;
 
@@ -37,6 +38,7 @@
     protected String user = "sa";
     protected String password = "";
     private DataSource ds;
+       private JdbcTemplate jdbcTemplate;
 
     public void testSimpleBody() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
@@ -98,11 +100,28 @@
         assertEquals("Camel", row1.get("PROJECT"));
     }
 
+    public void testInsert() throws Exception {
+       MockEndpoint mock = getMockEndpoint("mock:result");
+       mock.expectedMessageCount(1);
+       
+               template.sendBody("direct:insert", new Object[] {10, "test", 
"test"});
+               mock.assertIsSatisfied();
+               try {
+                       String projectName = (String) 
jdbcTemplate.queryForObject("select project from projects where id = 10", 
String.class);
+                       assertEquals("test", projectName);
+               } catch (EmptyResultDataAccessException e) {
+                       fail("no row inserted");
+               }
+               
+               Integer actualUpdateCount = 
mock.getExchanges().get(0).getIn().getHeader(SqlProducer.UPDATE_COUNT, 
Integer.class);
+               assertEquals((Integer)1, actualUpdateCount);
+       }
+    
     protected void setUp() throws Exception {
         Class.forName(driverClass);
         super.setUp();
 
-        JdbcTemplate jdbcTemplate = new JdbcTemplate(ds);
+        jdbcTemplate = new JdbcTemplate(ds);
         jdbcTemplate.execute("create table projects (id integer primary key,"
                              + "project varchar(10), license varchar(5))");
         jdbcTemplate.execute("insert into projects values (1, 'Camel', 
'ASF')");
@@ -135,6 +154,9 @@
                     .to("sql:select * from projects where license = # order by 
id?template.maxRows=1")
                     .to("mock:result");
 
+                from("direct:insert")
+                       .to("sql:insert into projects values (#, #, #)")
+                       .to("mock:result");
             }
         };
     }


Reply via email to