Author: davsclaus
Date: Thu Jan 17 13:22:05 2013
New Revision: 1434662

URL: http://svn.apache.org/viewvc?rev=1434662&view=rev
Log:
CAMEL-5976: camel-sql consumer can now do onConsume to delete row after 
processing etc.

Modified:
    
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
    
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java
    
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
    
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java
    
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
    camel/trunk/components/camel-sql/src/test/resources/log4j.properties

Modified: 
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java?rev=1434662&r1=1434661&r2=1434662&view=diff
==============================================================================
--- 
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
 (original)
+++ 
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
 Thu Jan 17 13:22:05 2013
@@ -19,7 +19,6 @@ package org.apache.camel.component.sql;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.Iterator;
-import java.util.Map;
 
 import org.apache.camel.Exchange;
 import org.slf4j.Logger;
@@ -35,75 +34,30 @@ public class DefaultSqlProcessingStrateg
     private static final Logger LOG = 
LoggerFactory.getLogger(DefaultSqlProcessingStrategy.class);
 
     @Override
-    public void commit(SqlEndpoint endpoint, final Exchange exchange, Object 
data, JdbcTemplate jdbcTemplate, final String query) throws Exception {
-        jdbcTemplate.execute(query, new PreparedStatementCallback<Map<?, ?>>() 
{
-            public Map<?, ?> doInPreparedStatement(PreparedStatement ps) 
throws SQLException {
+    public int commit(final SqlEndpoint endpoint, final Exchange exchange, 
final Object data, final JdbcTemplate jdbcTemplate, final String query) throws 
Exception {
+
+        final String preparedQuery = 
endpoint.getPrepareStatementStrategy().prepareQuery(query, 
endpoint.isAllowNamedParameters());
+
+        return jdbcTemplate.execute(preparedQuery, new 
PreparedStatementCallback<Integer>() {
+            public Integer doInPreparedStatement(PreparedStatement ps) throws 
SQLException {
                 int expected = ps.getParameterMetaData().getParameterCount();
 
-                Iterator<?> iterator = createIterator(exchange, query, 
expected);
+                Iterator<?> iterator = 
endpoint.getPrepareStatementStrategy().createPopulateIterator(query, 
preparedQuery, expected, exchange, data);
                 if (iterator != null) {
-                    populateStatement(ps, iterator, expected);
+                    
endpoint.getPrepareStatementStrategy().populateStatement(ps, iterator, 
expected);
                     LOG.trace("Execute query {}", query);
                     ps.execute();
-                }
-
-                return null;
-            };
-        });
-    }
 
-    private Iterator<?> createIterator(Exchange exchange, final String query, 
final int expectedParams) {
-        Object body = exchange.getIn().getBody();
-        if (body == null) {
-            return null;
-        }
-
-        // TODO: support named parameters
-/*
-        if (body instanceof Map) {
-            final Map map = (Map) body;
-            return new Iterator() {
-
-                private int current;
-
-                @Override
-                public boolean hasNext() {
-                    return current < expectedParams;
+                    int updateCount = ps.getUpdateCount();
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Update count {}", updateCount);
+                    }
+                    return updateCount;
                 }
 
-                @Override
-                public Object next() {
-                    current++;
-                    // TODO: Fix me
-                    return map.get("ID");
-                }
-
-                @Override
-                public void remove() {
-                    // noop
-                }
+                return 0;
             };
-        }*/
-
-        // else force as iterator based
-        Iterator<?> iterator = exchange.getIn().getBody(Iterator.class);
-        return iterator;
-    }
-
-    private void populateStatement(PreparedStatement ps, Iterator<?> iterator, 
int expectedParams) throws SQLException {
-        int argNumber = 1;
-        if (expectedParams > 0) {
-            while (iterator != null && iterator.hasNext()) {
-                Object value = iterator.next();
-                LOG.trace("Setting parameter #{} with value: {}", argNumber, 
value);
-                ps.setObject(argNumber, value);
-                argNumber++;
-            }
-        }
-
-        if (argNumber - 1 != expectedParams) {
-            throw new SQLException("Number of parameters mismatch. Expected: " 
+ expectedParams + ", was:" + (argNumber - 1));
-        }
+        });
     }
 
 }

Modified: 
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java?rev=1434662&r1=1434661&r2=1434662&view=diff
==============================================================================
--- 
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java
 (original)
+++ 
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java
 Thu Jan 17 13:22:05 2013
@@ -51,7 +51,18 @@ public class SqlComponent extends Defaul
         IntrospectionSupport.setProperties(jdbcTemplate, parameters, 
"template.");
 
         String query = remaining.replaceAll(parameterPlaceholderSubstitute, 
"?");
-        return new SqlEndpoint(uri, this, jdbcTemplate, query);
+
+        String onConsume = getAndRemoveParameter(parameters, 
"consumer.onConsume", String.class);
+        if (onConsume == null) {
+            onConsume = getAndRemoveParameter(parameters, "onConsume", 
String.class);
+        }
+        if (onConsume != null) {
+            onConsume = onConsume.replaceAll(parameterPlaceholderSubstitute, 
"?");
+        }
+
+        SqlEndpoint endpoint = new SqlEndpoint(uri, this, jdbcTemplate, query);
+        endpoint.setOnConsume(onConsume);
+        return endpoint;
     }
 
     public void setDataSource(DataSource dataSource) {

Modified: 
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java?rev=1434662&r1=1434661&r2=1434662&view=diff
==============================================================================
--- 
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
 (original)
+++ 
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
 Thu Jan 17 13:22:05 2013
@@ -45,20 +45,11 @@ public class SqlConsumer extends Schedul
     private final String query;
     private final JdbcTemplate jdbcTemplate;
 
-    /**
-     * Statement to run after data has been processed in the route
-     */
     private String onConsume;
-
-    /**
-     * Process resultset individually or as a list
-     */
     private boolean useIterator = true;
-
-    /**
-     * Whether allow empty resultset to be routed to the next hop
-     */
     private boolean routeEmptyResultSet;
+    private int expectedUpdateCount = -1;
+    private boolean breakBatchOnConsumeFail;
 
     private static final class DataHolder {
         private Exchange exchange;
@@ -92,9 +83,10 @@ public class SqlConsumer extends Schedul
             public Integer doInPreparedStatement(PreparedStatement 
preparedStatement) throws SQLException, DataAccessException {
                 Queue<DataHolder> answer = new LinkedList<DataHolder>();
 
+                log.debug("Executing query: {}", preparedQuery);
                 ResultSet rs = preparedStatement.executeQuery();
                 try {
-                    log.trace("Got result list from query {}", rs);
+                    log.trace("Got result list from query: {}", rs);
 
                     RowMapperResultSetExtractor<Map<String, Object>> mapper = 
new RowMapperResultSetExtractor<Map<String, Object>>(new ColumnMapRowMapper());
                     List<Map<String, Object>> data = mapper.extractData(rs);
@@ -166,19 +158,24 @@ public class SqlConsumer extends Schedul
             pendingExchanges = total - index - 1;
 
             // process the current exchange
-            log.debug("Processing exchange: {} with properties: {}", exchange, 
exchange.getProperties());
             getProcessor().process(exchange);
 
-            // TODO: support when with CAMEL-5977
-            /*
             try {
-                if (onConsume != null) {
-                    SqlEndpoint endpoint = (SqlEndpoint) getEndpoint();
-                    endpoint.getProcessingStrategy().commit(endpoint, 
exchange, data, jdbcTemplate, onConsume);
+                // we can only run on consume if there was data
+                if (onConsume != null && data != null) {
+                    int updateCount = 
getEndpoint().getProcessingStrategy().commit(getEndpoint(), exchange, data, 
jdbcTemplate, onConsume);
+                    if (expectedUpdateCount > -1 && updateCount != 
expectedUpdateCount) {
+                        String msg = "Expected update count " + 
expectedUpdateCount + " but was " + updateCount + " executing query: " + 
onConsume;
+                        throw new SQLException(msg);
+                    }
                 }
             } catch (Exception e) {
-                handleException(e);
-            }*/
+                if (breakBatchOnConsumeFail) {
+                    throw e;
+                } else {
+                    handleException("Error executing onConsume query " + 
onConsume, e);
+                }
+            }
         }
 
         return total;
@@ -231,5 +228,28 @@ public class SqlConsumer extends Schedul
         this.routeEmptyResultSet = routeEmptyResultSet;
     }
 
+    public int getExpectedUpdateCount() {
+        return expectedUpdateCount;
+    }
+
+    /**
+     * Sets an expected update count to validate when using onConsume.
+     *
+     * @param expectedUpdateCount typically set this value to <tt>1</tt> to 
expect 1 row updated.
+     */
+    public void setExpectedUpdateCount(int expectedUpdateCount) {
+        this.expectedUpdateCount = expectedUpdateCount;
+    }
+
+    public boolean isBreakBatchOnConsumeFail() {
+        return breakBatchOnConsumeFail;
+    }
+
+    /**
+     * Sets whether to break batch if onConsume failed.
+     */
+    public void setBreakBatchOnConsumeFail(boolean breakBatchOnConsumeFail) {
+        this.breakBatchOnConsumeFail = breakBatchOnConsumeFail;
+    }
 }
 

Modified: 
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java?rev=1434662&r1=1434661&r2=1434662&view=diff
==============================================================================
--- 
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java
 (original)
+++ 
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java
 Thu Jan 17 13:22:05 2013
@@ -32,7 +32,8 @@ public interface SqlProcessingStrategy {
      * @param data         The original data delivered to the route
      * @param jdbcTemplate The JDBC template
      * @param query        The SQL query to execute
+     * @return the update count if the query returned an update count
      * @throws Exception can be thrown in case of error
      */
-    void commit(SqlEndpoint endpoint, Exchange exchange, Object data, 
JdbcTemplate jdbcTemplate, String query) throws Exception;
+    int commit(SqlEndpoint endpoint, Exchange exchange, Object data, 
JdbcTemplate jdbcTemplate, String query) throws Exception;
 }

Modified: 
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java?rev=1434662&r1=1434661&r2=1434662&view=diff
==============================================================================
--- 
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
 (original)
+++ 
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
 Thu Jan 17 13:22:05 2013
@@ -20,13 +20,14 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.builder.NotifyBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
+import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
 import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
 import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
@@ -34,16 +35,18 @@ import org.springframework.jdbc.datasour
 /**
  *
  */
-@Ignore
 public class SqlConsumerDeleteTest extends CamelTestSupport {
 
     private EmbeddedDatabase db;
+    private JdbcTemplate jdbcTemplate;
 
     @Before
     public void setUp() throws Exception {
         db = new EmbeddedDatabaseBuilder()
             
.setType(EmbeddedDatabaseType.DERBY).addScript("sql/createAndPopulateDatabase.sql").build();
 
+        jdbcTemplate = new JdbcTemplate(db);
+
         super.setUp();
     }
 
@@ -70,6 +73,12 @@ public class SqlConsumerDeleteTest exten
         assertEquals("AMQ", 
exchanges.get(1).getIn().getBody(Map.class).get("PROJECT"));
         assertEquals(3, exchanges.get(2).getIn().getBody(Map.class).get("ID"));
         assertEquals("Linux", 
exchanges.get(2).getIn().getBody(Map.class).get("PROJECT"));
+
+        // give it a little tine to delete
+        Thread.sleep(500);
+
+        // there should only be 1 row in the table
+        assertEquals("Should have deleted all 3 rows", 0, 
jdbcTemplate.queryForInt("select count(*) from projects"));
     }
 
     @Override
@@ -79,7 +88,7 @@ public class SqlConsumerDeleteTest exten
             public void configure() throws Exception {
                 getContext().getComponent("sql", 
SqlComponent.class).setDataSource(db);
 
-                from("sql:select * from projects order by 
id?consumer.onConsume=delete from projects where id = #")
+                from("sql:select * from projects order by 
id?consumer.onConsume=delete from projects where id = :#id")
                     .to("mock:result");
             }
         };

Modified: camel/trunk/components/camel-sql/src/test/resources/log4j.properties
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/resources/log4j.properties?rev=1434662&r1=1434661&r2=1434662&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/test/resources/log4j.properties 
(original)
+++ camel/trunk/components/camel-sql/src/test/resources/log4j.properties Thu 
Jan 17 13:22:05 2013
@@ -16,7 +16,7 @@
 ## ------------------------------------------------------------------------
 
 #
-# The logging properties used for eclipse testing, We want to see debug output 
on the console.
+# The logging properties used for testing
 #
 log4j.rootLogger=INFO, file
 


Reply via email to