Author: davsclaus
Date: Thu Jan 17 13:22:05 2013
New Revision: 1434662
URL: http://svn.apache.org/viewvc?rev=1434662&view=rev
Log:
CAMEL-5976: camel-sql consumer can now do onConsume to delete row after
processing etc.
Modified:
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
camel/trunk/components/camel-sql/src/test/resources/log4j.properties
Modified:
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java?rev=1434662&r1=1434661&r2=1434662&view=diff
==============================================================================
---
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
(original)
+++
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
Thu Jan 17 13:22:05 2013
@@ -19,7 +19,6 @@ package org.apache.camel.component.sql;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Iterator;
-import java.util.Map;
import org.apache.camel.Exchange;
import org.slf4j.Logger;
@@ -35,75 +34,30 @@ public class DefaultSqlProcessingStrateg
private static final Logger LOG =
LoggerFactory.getLogger(DefaultSqlProcessingStrategy.class);
@Override
- public void commit(SqlEndpoint endpoint, final Exchange exchange, Object
data, JdbcTemplate jdbcTemplate, final String query) throws Exception {
- jdbcTemplate.execute(query, new PreparedStatementCallback<Map<?, ?>>()
{
- public Map<?, ?> doInPreparedStatement(PreparedStatement ps)
throws SQLException {
+ public int commit(final SqlEndpoint endpoint, final Exchange exchange,
final Object data, final JdbcTemplate jdbcTemplate, final String query) throws
Exception {
+
+ final String preparedQuery =
endpoint.getPrepareStatementStrategy().prepareQuery(query,
endpoint.isAllowNamedParameters());
+
+ return jdbcTemplate.execute(preparedQuery, new
PreparedStatementCallback<Integer>() {
+ public Integer doInPreparedStatement(PreparedStatement ps) throws
SQLException {
int expected = ps.getParameterMetaData().getParameterCount();
- Iterator<?> iterator = createIterator(exchange, query,
expected);
+ Iterator<?> iterator =
endpoint.getPrepareStatementStrategy().createPopulateIterator(query,
preparedQuery, expected, exchange, data);
if (iterator != null) {
- populateStatement(ps, iterator, expected);
+
endpoint.getPrepareStatementStrategy().populateStatement(ps, iterator,
expected);
LOG.trace("Execute query {}", query);
ps.execute();
- }
-
- return null;
- };
- });
- }
- private Iterator<?> createIterator(Exchange exchange, final String query,
final int expectedParams) {
- Object body = exchange.getIn().getBody();
- if (body == null) {
- return null;
- }
-
- // TODO: support named parameters
-/*
- if (body instanceof Map) {
- final Map map = (Map) body;
- return new Iterator() {
-
- private int current;
-
- @Override
- public boolean hasNext() {
- return current < expectedParams;
+ int updateCount = ps.getUpdateCount();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Update count {}", updateCount);
+ }
+ return updateCount;
}
- @Override
- public Object next() {
- current++;
- // TODO: Fix me
- return map.get("ID");
- }
-
- @Override
- public void remove() {
- // noop
- }
+ return 0;
};
- }*/
-
- // else force as iterator based
- Iterator<?> iterator = exchange.getIn().getBody(Iterator.class);
- return iterator;
- }
-
- private void populateStatement(PreparedStatement ps, Iterator<?> iterator,
int expectedParams) throws SQLException {
- int argNumber = 1;
- if (expectedParams > 0) {
- while (iterator != null && iterator.hasNext()) {
- Object value = iterator.next();
- LOG.trace("Setting parameter #{} with value: {}", argNumber,
value);
- ps.setObject(argNumber, value);
- argNumber++;
- }
- }
-
- if (argNumber - 1 != expectedParams) {
- throw new SQLException("Number of parameters mismatch. Expected: "
+ expectedParams + ", was:" + (argNumber - 1));
- }
+ });
}
}
Modified:
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java?rev=1434662&r1=1434661&r2=1434662&view=diff
==============================================================================
---
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java
(original)
+++
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java
Thu Jan 17 13:22:05 2013
@@ -51,7 +51,18 @@ public class SqlComponent extends Defaul
IntrospectionSupport.setProperties(jdbcTemplate, parameters,
"template.");
String query = remaining.replaceAll(parameterPlaceholderSubstitute,
"?");
- return new SqlEndpoint(uri, this, jdbcTemplate, query);
+
+ String onConsume = getAndRemoveParameter(parameters,
"consumer.onConsume", String.class);
+ if (onConsume == null) {
+ onConsume = getAndRemoveParameter(parameters, "onConsume",
String.class);
+ }
+ if (onConsume != null) {
+ onConsume = onConsume.replaceAll(parameterPlaceholderSubstitute,
"?");
+ }
+
+ SqlEndpoint endpoint = new SqlEndpoint(uri, this, jdbcTemplate, query);
+ endpoint.setOnConsume(onConsume);
+ return endpoint;
}
public void setDataSource(DataSource dataSource) {
Modified:
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java?rev=1434662&r1=1434661&r2=1434662&view=diff
==============================================================================
---
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
(original)
+++
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
Thu Jan 17 13:22:05 2013
@@ -45,20 +45,11 @@ public class SqlConsumer extends Schedul
private final String query;
private final JdbcTemplate jdbcTemplate;
- /**
- * Statement to run after data has been processed in the route
- */
private String onConsume;
-
- /**
- * Process resultset individually or as a list
- */
private boolean useIterator = true;
-
- /**
- * Whether allow empty resultset to be routed to the next hop
- */
private boolean routeEmptyResultSet;
+ private int expectedUpdateCount = -1;
+ private boolean breakBatchOnConsumeFail;
private static final class DataHolder {
private Exchange exchange;
@@ -92,9 +83,10 @@ public class SqlConsumer extends Schedul
public Integer doInPreparedStatement(PreparedStatement
preparedStatement) throws SQLException, DataAccessException {
Queue<DataHolder> answer = new LinkedList<DataHolder>();
+ log.debug("Executing query: {}", preparedQuery);
ResultSet rs = preparedStatement.executeQuery();
try {
- log.trace("Got result list from query {}", rs);
+ log.trace("Got result list from query: {}", rs);
RowMapperResultSetExtractor<Map<String, Object>> mapper =
new RowMapperResultSetExtractor<Map<String, Object>>(new ColumnMapRowMapper());
List<Map<String, Object>> data = mapper.extractData(rs);
@@ -166,19 +158,24 @@ public class SqlConsumer extends Schedul
pendingExchanges = total - index - 1;
// process the current exchange
- log.debug("Processing exchange: {} with properties: {}", exchange,
exchange.getProperties());
getProcessor().process(exchange);
- // TODO: support when with CAMEL-5977
- /*
try {
- if (onConsume != null) {
- SqlEndpoint endpoint = (SqlEndpoint) getEndpoint();
- endpoint.getProcessingStrategy().commit(endpoint,
exchange, data, jdbcTemplate, onConsume);
+ // we can only run on consume if there was data
+ if (onConsume != null && data != null) {
+ int updateCount =
getEndpoint().getProcessingStrategy().commit(getEndpoint(), exchange, data,
jdbcTemplate, onConsume);
+ if (expectedUpdateCount > -1 && updateCount !=
expectedUpdateCount) {
+ String msg = "Expected update count " +
expectedUpdateCount + " but was " + updateCount + " executing query: " +
onConsume;
+ throw new SQLException(msg);
+ }
}
} catch (Exception e) {
- handleException(e);
- }*/
+ if (breakBatchOnConsumeFail) {
+ throw e;
+ } else {
+ handleException("Error executing onConsume query " +
onConsume, e);
+ }
+ }
}
return total;
@@ -231,5 +228,28 @@ public class SqlConsumer extends Schedul
this.routeEmptyResultSet = routeEmptyResultSet;
}
+ public int getExpectedUpdateCount() {
+ return expectedUpdateCount;
+ }
+
+ /**
+ * Sets an expected update count to validate when using onConsume.
+ *
+ * @param expectedUpdateCount typically set this value to <tt>1</tt> to
expect 1 row updated.
+ */
+ public void setExpectedUpdateCount(int expectedUpdateCount) {
+ this.expectedUpdateCount = expectedUpdateCount;
+ }
+
+ public boolean isBreakBatchOnConsumeFail() {
+ return breakBatchOnConsumeFail;
+ }
+
+ /**
+ * Sets whether to break batch if onConsume failed.
+ */
+ public void setBreakBatchOnConsumeFail(boolean breakBatchOnConsumeFail) {
+ this.breakBatchOnConsumeFail = breakBatchOnConsumeFail;
+ }
}
Modified:
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java?rev=1434662&r1=1434661&r2=1434662&view=diff
==============================================================================
---
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java
(original)
+++
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java
Thu Jan 17 13:22:05 2013
@@ -32,7 +32,8 @@ public interface SqlProcessingStrategy {
* @param data The original data delivered to the route
* @param jdbcTemplate The JDBC template
* @param query The SQL query to execute
+ * @return the update count if the query returned an update count
* @throws Exception can be thrown in case of error
*/
- void commit(SqlEndpoint endpoint, Exchange exchange, Object data,
JdbcTemplate jdbcTemplate, String query) throws Exception;
+ int commit(SqlEndpoint endpoint, Exchange exchange, Object data,
JdbcTemplate jdbcTemplate, String query) throws Exception;
}
Modified:
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java?rev=1434662&r1=1434661&r2=1434662&view=diff
==============================================================================
---
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
(original)
+++
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
Thu Jan 17 13:22:05 2013
@@ -20,13 +20,14 @@ import java.util.List;
import java.util.Map;
import org.apache.camel.Exchange;
+import org.apache.camel.builder.NotifyBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.After;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
+import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
@@ -34,16 +35,18 @@ import org.springframework.jdbc.datasour
/**
*
*/
-@Ignore
public class SqlConsumerDeleteTest extends CamelTestSupport {
private EmbeddedDatabase db;
+ private JdbcTemplate jdbcTemplate;
@Before
public void setUp() throws Exception {
db = new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.DERBY).addScript("sql/createAndPopulateDatabase.sql").build();
+ jdbcTemplate = new JdbcTemplate(db);
+
super.setUp();
}
@@ -70,6 +73,12 @@ public class SqlConsumerDeleteTest exten
assertEquals("AMQ",
exchanges.get(1).getIn().getBody(Map.class).get("PROJECT"));
assertEquals(3, exchanges.get(2).getIn().getBody(Map.class).get("ID"));
assertEquals("Linux",
exchanges.get(2).getIn().getBody(Map.class).get("PROJECT"));
+
+ // give it a little tine to delete
+ Thread.sleep(500);
+
+ // there should only be 1 row in the table
+ assertEquals("Should have deleted all 3 rows", 0,
jdbcTemplate.queryForInt("select count(*) from projects"));
}
@Override
@@ -79,7 +88,7 @@ public class SqlConsumerDeleteTest exten
public void configure() throws Exception {
getContext().getComponent("sql",
SqlComponent.class).setDataSource(db);
- from("sql:select * from projects order by
id?consumer.onConsume=delete from projects where id = #")
+ from("sql:select * from projects order by
id?consumer.onConsume=delete from projects where id = :#id")
.to("mock:result");
}
};
Modified: camel/trunk/components/camel-sql/src/test/resources/log4j.properties
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/resources/log4j.properties?rev=1434662&r1=1434661&r2=1434662&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/test/resources/log4j.properties
(original)
+++ camel/trunk/components/camel-sql/src/test/resources/log4j.properties Thu
Jan 17 13:22:05 2013
@@ -16,7 +16,7 @@
## ------------------------------------------------------------------------
#
-# The logging properties used for eclipse testing, We want to see debug output
on the console.
+# The logging properties used for testing
#
log4j.rootLogger=INFO, file