Author: davsclaus
Date: Thu Jan 17 13:45:55 2013
New Revision: 1434669

URL: http://svn.apache.org/viewvc?rev=1434669&view=rev
Log:
CAMEL-5976: camel-sql consumer can now do onConsume to delete row after 
processing etc.

Added:
    
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteBatchCompleteTest.java
      - copied, changed from r1434662, 
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
    
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTransformTest.java
      - copied, changed from r1434662, 
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
Modified:
    
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
    
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java
    
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
    
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
    
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java
    
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java

Modified: 
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java?rev=1434669&r1=1434668&r2=1434669&view=diff
==============================================================================
--- 
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
 (original)
+++ 
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
 Thu Jan 17 13:45:55 2013
@@ -60,5 +60,27 @@ public class DefaultSqlProcessingStrateg
         });
     }
 
+    @Override
+    public int commitBatchComplete(final SqlEndpoint endpoint, final 
JdbcTemplate jdbcTemplate, final String query) throws Exception {
+        final String preparedQuery = 
endpoint.getPrepareStatementStrategy().prepareQuery(query, 
endpoint.isAllowNamedParameters());
+
+        return jdbcTemplate.execute(preparedQuery, new 
PreparedStatementCallback<Integer>() {
+            public Integer doInPreparedStatement(PreparedStatement ps) throws 
SQLException {
+                int expected = ps.getParameterMetaData().getParameterCount();
+                if (expected != 0) {
+                    throw new IllegalArgumentException("Query 
onConsumeBatchComplete " + query + " cannot have parameters, was " + expected);
+                }
+
+                LOG.trace("Execute query {}", query);
+                ps.execute();
+
+                int updateCount = ps.getUpdateCount();
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Update count {}", updateCount);
+                }
+                return updateCount;
+            };
+        });
+    }
 }
 

Modified: 
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java?rev=1434669&r1=1434668&r2=1434669&view=diff
==============================================================================
--- 
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java
 (original)
+++ 
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java
 Thu Jan 17 13:45:55 2013
@@ -59,9 +59,17 @@ public class SqlComponent extends Defaul
         if (onConsume != null) {
             onConsume = onConsume.replaceAll(parameterPlaceholderSubstitute, 
"?");
         }
+        String onConsumeBatchComplete = getAndRemoveParameter(parameters, 
"consumer.onConsumeBatchComplete", String.class);
+        if (onConsumeBatchComplete == null) {
+            onConsumeBatchComplete = getAndRemoveParameter(parameters, 
"onConsumeBatchComplete", String.class);
+        }
+        if (onConsumeBatchComplete != null) {
+            onConsumeBatchComplete = 
onConsumeBatchComplete.replaceAll(parameterPlaceholderSubstitute, "?");
+        }
 
         SqlEndpoint endpoint = new SqlEndpoint(uri, this, jdbcTemplate, query);
         endpoint.setOnConsume(onConsume);
+        endpoint.setOnConsumeBatchComplete(onConsumeBatchComplete);
         return endpoint;
     }
 

Modified: 
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java?rev=1434669&r1=1434668&r2=1434669&view=diff
==============================================================================
--- 
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
 (original)
+++ 
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
 Thu Jan 17 13:45:55 2013
@@ -46,6 +46,7 @@ public class SqlConsumer extends Schedul
     private final JdbcTemplate jdbcTemplate;
 
     private String onConsume;
+    private String onConsumeBatchComplete;
     private boolean useIterator = true;
     private boolean routeEmptyResultSet;
     private int expectedUpdateCount = -1;
@@ -178,6 +179,19 @@ public class SqlConsumer extends Schedul
             }
         }
 
+        try {
+            if (onConsumeBatchComplete != null) {
+                int updateCount = 
getEndpoint().getProcessingStrategy().commitBatchComplete(getEndpoint(), 
jdbcTemplate, onConsumeBatchComplete);
+                log.debug("onConsumeBatchComplete update count {}", 
updateCount);
+            }
+        } catch (Exception e) {
+            if (breakBatchOnConsumeFail) {
+                throw e;
+            } else {
+                handleException("Error executing onConsumeBatchComplete query 
" + onConsumeBatchComplete, e);
+            }
+        }
+
         return total;
     }
 
@@ -197,6 +211,14 @@ public class SqlConsumer extends Schedul
         this.onConsume = onConsume;
     }
 
+    public String getOnConsumeBatchComplete() {
+        return onConsumeBatchComplete;
+    }
+
+    public void setOnConsumeBatchComplete(String onConsumeBatchComplete) {
+        this.onConsumeBatchComplete = onConsumeBatchComplete;
+    }
+
     /**
      * Indicates how resultset should be delivered to the route
      */

Modified: 
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java?rev=1434669&r1=1434668&r2=1434669&view=diff
==============================================================================
--- 
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
 (original)
+++ 
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
 Thu Jan 17 13:45:55 2013
@@ -37,6 +37,7 @@ public class SqlEndpoint extends Default
     private SqlProcessingStrategy processingStrategy = new 
DefaultSqlProcessingStrategy();
     private SqlPrepareStatementStrategy prepareStatementStrategy = new 
DefaultSqlPrepareStatementStrategy();
     private String onConsume;
+    private String onConsumeBatchComplete;
     private boolean allowNamedParameters = true;
 
     // TODO: onConsumeBatchDone to execute a query when batch done
@@ -54,6 +55,7 @@ public class SqlEndpoint extends Default
         SqlConsumer consumer = new SqlConsumer(this, processor, jdbcTemplate, 
query);
         consumer.setMaxMessagesPerPoll(getMaxMessagesPerPoll());
         consumer.setOnConsume(getOnConsume());
+        consumer.setOnConsumeBatchComplete(getOnConsumeBatchComplete());
         configureConsumer(consumer);
         return consumer;
     }
@@ -122,6 +124,14 @@ public class SqlEndpoint extends Default
         this.onConsume = onConsume;
     }
 
+    public String getOnConsumeBatchComplete() {
+        return onConsumeBatchComplete;
+    }
+
+    public void setOnConsumeBatchComplete(String onConsumeBatchComplete) {
+        this.onConsumeBatchComplete = onConsumeBatchComplete;
+    }
+
     public boolean isAllowNamedParameters() {
         return allowNamedParameters;
     }

Modified: 
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java?rev=1434669&r1=1434668&r2=1434669&view=diff
==============================================================================
--- 
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java
 (original)
+++ 
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java
 Thu Jan 17 13:45:55 2013
@@ -36,4 +36,16 @@ public interface SqlProcessingStrategy {
      * @throws Exception can be thrown in case of error
      */
     int commit(SqlEndpoint endpoint, Exchange exchange, Object data, 
JdbcTemplate jdbcTemplate, String query) throws Exception;
+
+    /**
+     * Commit callback when the batch is complete. This allows you to do one 
extra query after all rows has been processed in the batch.
+     *
+     * @param endpoint     the endpoint
+     * @param jdbcTemplate The JDBC template
+     * @param query        The SQL query to execute
+     * @return the update count if the query returned an update count
+     * @throws Exception can be thrown in case of error
+     */
+    int commitBatchComplete(SqlEndpoint endpoint, JdbcTemplate jdbcTemplate, 
String query) throws Exception;
+
 }

Copied: 
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteBatchCompleteTest.java
 (from r1434662, 
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteBatchCompleteTest.java?p2=camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteBatchCompleteTest.java&p1=camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java&r1=1434662&r2=1434669&rev=1434669&view=diff
==============================================================================
--- 
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
 (original)
+++ 
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteBatchCompleteTest.java
 Thu Jan 17 13:45:55 2013
@@ -16,11 +16,6 @@
  */
 package org.apache.camel.component.sql;
 
-import java.util.List;
-import java.util.Map;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.builder.NotifyBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.junit4.CamelTestSupport;
@@ -35,7 +30,7 @@ import org.springframework.jdbc.datasour
 /**
  *
  */
-public class SqlConsumerDeleteTest extends CamelTestSupport {
+public class SqlConsumerDeleteBatchCompleteTest extends CamelTestSupport {
 
     private EmbeddedDatabase db;
     private JdbcTemplate jdbcTemplate;
@@ -60,24 +55,13 @@ public class SqlConsumerDeleteTest exten
     @Test
     public void testConsume() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedMessageCount(3);
+        mock.expectedMessageCount(1);
 
         assertMockEndpointsSatisfied();
 
-        List<Exchange> exchanges = mock.getReceivedExchanges();
-        assertEquals(3, exchanges.size());
-
-        assertEquals(1, exchanges.get(0).getIn().getBody(Map.class).get("ID"));
-        assertEquals("Camel", 
exchanges.get(0).getIn().getBody(Map.class).get("PROJECT"));
-        assertEquals(2, exchanges.get(1).getIn().getBody(Map.class).get("ID"));
-        assertEquals("AMQ", 
exchanges.get(1).getIn().getBody(Map.class).get("PROJECT"));
-        assertEquals(3, exchanges.get(2).getIn().getBody(Map.class).get("ID"));
-        assertEquals("Linux", 
exchanges.get(2).getIn().getBody(Map.class).get("PROJECT"));
-
         // give it a little tine to delete
         Thread.sleep(500);
 
-        // there should only be 1 row in the table
         assertEquals("Should have deleted all 3 rows", 0, 
jdbcTemplate.queryForInt("select count(*) from projects"));
     }
 
@@ -88,7 +72,7 @@ public class SqlConsumerDeleteTest exten
             public void configure() throws Exception {
                 getContext().getComponent("sql", 
SqlComponent.class).setDataSource(db);
 
-                from("sql:select * from projects order by 
id?consumer.onConsume=delete from projects where id = :#id")
+                from("sql:select * from projects order by 
id?consumer.onConsumeBatchComplete=delete from projects")
                     .to("mock:result");
             }
         };

Modified: 
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java?rev=1434669&r1=1434668&r2=1434669&view=diff
==============================================================================
--- 
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
 (original)
+++ 
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
 Thu Jan 17 13:45:55 2013
@@ -77,7 +77,6 @@ public class SqlConsumerDeleteTest exten
         // give it a little tine to delete
         Thread.sleep(500);
 
-        // there should only be 1 row in the table
         assertEquals("Should have deleted all 3 rows", 0, 
jdbcTemplate.queryForInt("select count(*) from projects"));
     }
 

Copied: 
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTransformTest.java
 (from r1434662, 
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTransformTest.java?p2=camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTransformTest.java&p1=camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java&r1=1434662&r2=1434669&rev=1434669&view=diff
==============================================================================
--- 
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
 (original)
+++ 
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTransformTest.java
 Thu Jan 17 13:45:55 2013
@@ -16,11 +16,6 @@
  */
 package org.apache.camel.component.sql;
 
-import java.util.List;
-import java.util.Map;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.builder.NotifyBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.junit4.CamelTestSupport;
@@ -35,7 +30,7 @@ import org.springframework.jdbc.datasour
 /**
  *
  */
-public class SqlConsumerDeleteTest extends CamelTestSupport {
+public class SqlConsumerDeleteTransformTest extends CamelTestSupport {
 
     private EmbeddedDatabase db;
     private JdbcTemplate jdbcTemplate;
@@ -60,24 +55,13 @@ public class SqlConsumerDeleteTest exten
     @Test
     public void testConsume() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedMessageCount(3);
+        mock.expectedBodiesReceived("The project is Camel", "The project is 
AMQ", "The project is Linux");
 
         assertMockEndpointsSatisfied();
 
-        List<Exchange> exchanges = mock.getReceivedExchanges();
-        assertEquals(3, exchanges.size());
-
-        assertEquals(1, exchanges.get(0).getIn().getBody(Map.class).get("ID"));
-        assertEquals("Camel", 
exchanges.get(0).getIn().getBody(Map.class).get("PROJECT"));
-        assertEquals(2, exchanges.get(1).getIn().getBody(Map.class).get("ID"));
-        assertEquals("AMQ", 
exchanges.get(1).getIn().getBody(Map.class).get("PROJECT"));
-        assertEquals(3, exchanges.get(2).getIn().getBody(Map.class).get("ID"));
-        assertEquals("Linux", 
exchanges.get(2).getIn().getBody(Map.class).get("PROJECT"));
-
         // give it a little tine to delete
         Thread.sleep(500);
 
-        // there should only be 1 row in the table
         assertEquals("Should have deleted all 3 rows", 0, 
jdbcTemplate.queryForInt("select count(*) from projects"));
     }
 
@@ -88,7 +72,10 @@ public class SqlConsumerDeleteTest exten
             public void configure() throws Exception {
                 getContext().getComponent("sql", 
SqlComponent.class).setDataSource(db);
 
+                // even if we transform the exchange we can still do onConsume 
as we have the original data at
+                // the point when onConsume is executed
                 from("sql:select * from projects order by 
id?consumer.onConsume=delete from projects where id = :#id")
+                    .transform().simple("The project is ${body[project]}")
                     .to("mock:result");
             }
         };


Reply via email to