Author: davsclaus
Date: Thu Jan 17 08:12:39 2013
New Revision: 1434584
URL: http://svn.apache.org/viewvc?rev=1434584&view=rev
Log:
CAMEL-5976: camel-sql now has batch consumer. Work in progress.
Added:
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
(with props)
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
(with props)
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java
(with props)
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
(with props)
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerTest.java
(with props)
Modified:
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
camel/trunk/components/camel-sql/src/test/resources/log4j.properties
Added:
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java?rev=1434584&view=auto
==============================================================================
---
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
(added)
+++
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
Thu Jan 17 08:12:39 2013
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.core.PreparedStatementCallback;
+
+/**
+ *
+ */
+public class DefaultSqlProcessingStrategy implements SqlProcessingStrategy {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DefaultSqlProcessingStrategy.class);
+
+ @Override
+ public void commit(SqlEndpoint endpoint, final Exchange exchange, Object
data, JdbcTemplate jdbcTemplate, final String query) throws Exception {
+ jdbcTemplate.execute(query, new PreparedStatementCallback<Map<?, ?>>()
{
+ public Map<?, ?> doInPreparedStatement(PreparedStatement ps)
throws SQLException {
+ int expected = ps.getParameterMetaData().getParameterCount();
+
+ Iterator<?> iterator = createIterator(exchange, query,
expected);
+ if (iterator != null) {
+ populateStatement(ps, iterator, expected);
+ LOG.trace("Execute query {}", query);
+ ps.execute();
+ }
+
+ return null;
+ };
+ });
+ }
+
+ private Iterator<?> createIterator(Exchange exchange, final String query,
final int expectedParams) {
+ Object body = exchange.getIn().getBody();
+ if (body == null) {
+ return null;
+ }
+
+ // TODO: support named parameters
+/*
+ if (body instanceof Map) {
+ final Map map = (Map) body;
+ return new Iterator() {
+
+ private int current;
+
+ @Override
+ public boolean hasNext() {
+ return current < expectedParams;
+ }
+
+ @Override
+ public Object next() {
+ current++;
+ // TODO: Fix me
+ return map.get("ID");
+ }
+
+ @Override
+ public void remove() {
+ // noop
+ }
+ };
+ }*/
+
+ // else force as iterator based
+ Iterator<?> iterator = exchange.getIn().getBody(Iterator.class);
+ return iterator;
+ }
+
+ private void populateStatement(PreparedStatement ps, Iterator<?> iterator,
int expectedParams) throws SQLException {
+ int argNumber = 1;
+ if (expectedParams > 0) {
+ while (iterator != null && iterator.hasNext()) {
+ Object value = iterator.next();
+ LOG.trace("Setting parameter #{} with value: {}", argNumber,
value);
+ ps.setObject(argNumber, value);
+ argNumber++;
+ }
+ }
+
+ if (argNumber - 1 != expectedParams) {
+ throw new SQLException("Number of parameters mismatch. Expected: "
+ expectedParams + ", was:" + (argNumber - 1));
+ }
+ }
+
+}
+
Propchange:
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java?rev=1434584&view=auto
==============================================================================
---
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
(added)
+++
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
Thu Jan 17 08:12:39 2013
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.ScheduledBatchPollingConsumer;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.ObjectHelper;
+import org.springframework.dao.DataAccessException;
+import org.springframework.jdbc.core.ColumnMapRowMapper;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.core.PreparedStatementCallback;
+import org.springframework.jdbc.core.RowMapperResultSetExtractor;
+
+/**
+ *
+ */
+public class SqlConsumer extends ScheduledBatchPollingConsumer {
+
+ private final String query;
+ private final JdbcTemplate jdbcTemplate;
+
+ /**
+ * Statement to run after data has been processed in the route
+ */
+ private String onConsume;
+
+ /**
+ * Process resultset individually or as a list
+ */
+ private boolean useIterator = true;
+
+ /**
+ * Whether allow empty resultset to be routed to the next hop
+ */
+ private boolean routeEmptyResultSet;
+
+ private static final class DataHolder {
+ private Exchange exchange;
+ private Object data;
+
+ private DataHolder() {
+ }
+ }
+
+ public SqlConsumer(Endpoint endpoint, Processor processor, JdbcTemplate
jdbcTemplate, String query) {
+ super(endpoint, processor);
+ this.jdbcTemplate = jdbcTemplate;
+ this.query = query;
+ }
+
+ @Override
+ protected int poll() throws Exception {
+ // must reset for each poll
+ shutdownRunningTask = null;
+ pendingExchanges = 0;
+
+ Integer messagePolled = jdbcTemplate.execute(query, new
PreparedStatementCallback<Integer>() {
+ @Override
+ public Integer doInPreparedStatement(PreparedStatement
preparedStatement) throws SQLException, DataAccessException {
+ Queue<DataHolder> answer = new LinkedList<DataHolder>();
+
+ ResultSet rs = preparedStatement.executeQuery();
+ try {
+ log.trace("Got result list from query {}", rs);
+
+ RowMapperResultSetExtractor<Map<String, Object>> mapper =
new RowMapperResultSetExtractor<Map<String, Object>>(new ColumnMapRowMapper());
+ List<Map<String, Object>> data = mapper.extractData(rs);
+
+ // create a list of exchange objects with the data
+ if (useIterator) {
+ for (Map<String, Object> item : data) {
+ Exchange exchange = createExchange(item);
+ DataHolder holder = new DataHolder();
+ holder.exchange = exchange;
+ holder.data = item;
+ answer.add(holder);
+ }
+ } else {
+ if (!data.isEmpty() || routeEmptyResultSet) {
+ Exchange exchange = createExchange(data);
+ DataHolder holder = new DataHolder();
+ holder.exchange = exchange;
+ holder.data = data;
+ answer.add(holder);
+ }
+ }
+ } finally {
+ rs.close();
+ }
+
+ // process all the exchanges in this batch
+ try {
+ int rows = processBatch(CastUtils.cast(answer));
+ return Integer.valueOf(rows);
+ } catch (Exception e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
+ }
+ });
+
+ return messagePolled;
+ }
+
+ protected Exchange createExchange(Object data) {
+ final Exchange exchange =
getEndpoint().createExchange(ExchangePattern.InOnly);
+ Message msg = exchange.getIn();
+ msg.setBody(data);
+ return exchange;
+ }
+
+ @Override
+ public int processBatch(Queue<Object> exchanges) throws Exception {
+ int total = exchanges.size();
+
+ // limit if needed
+ if (maxMessagesPerPoll > 0 && total > maxMessagesPerPoll) {
+ log.debug("Limiting to maximum messages to poll " +
maxMessagesPerPoll + " as there was " + total + " messages in this poll.");
+ total = maxMessagesPerPoll;
+ }
+
+ for (int index = 0; index < total && isBatchAllowed(); index++) {
+ // only loop if we are started (allowed to run)
+ DataHolder holder = ObjectHelper.cast(DataHolder.class,
exchanges.poll());
+ Exchange exchange = holder.exchange;
+ Object data = holder.data;
+
+ // add current index and total as properties
+ exchange.setProperty(Exchange.BATCH_INDEX, index);
+ exchange.setProperty(Exchange.BATCH_SIZE, total);
+ exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1);
+
+ // update pending number of exchanges
+ pendingExchanges = total - index - 1;
+
+ // process the current exchange
+ log.debug("Processing exchange: {} with properties: {}", exchange,
exchange.getProperties());
+ getProcessor().process(exchange);
+
+ // TODO: support when with CAMEL-5977
+ /*
+ try {
+ if (onConsume != null) {
+ SqlEndpoint endpoint = (SqlEndpoint) getEndpoint();
+ endpoint.getProcessingStrategy().commit(endpoint,
exchange, data, jdbcTemplate, onConsume);
+ }
+ } catch (Exception e) {
+ handleException(e);
+ }*/
+ }
+
+ return total;
+ }
+
+ /**
+ * Gets the statement(s) to run after successful processing.
+ * Use comma to separate multiple statements.
+ */
+ public String getOnConsume() {
+ return onConsume;
+ }
+
+ /**
+ * Sets the statement to run after successful processing.
+ * Use comma to separate multiple statements.
+ */
+ public void setOnConsume(String onConsume) {
+ this.onConsume = onConsume;
+ }
+
+ /**
+ * Indicates how resultset should be delivered to the route
+ */
+ public boolean isUseIterator() {
+ return useIterator;
+ }
+
+ /**
+ * Sets how resultset should be delivered to route.
+ * Indicates delivery as either a list or individual object.
+ * defaults to true.
+ */
+ public void setUseIterator(boolean useIterator) {
+ this.useIterator = useIterator;
+ }
+
+ /**
+ * Indicates whether empty resultset should be allowed to be sent to the
next hop or not
+ */
+ public boolean isRouteEmptyResultSet() {
+ return routeEmptyResultSet;
+ }
+
+ /**
+ * Sets whether empty resultset should be allowed to be sent to the next
hop.
+ * defaults to false. So the empty resultset will be filtered out.
+ */
+ public void setRouteEmptyResultSet(boolean routeEmptyResultSet) {
+ this.routeEmptyResultSet = routeEmptyResultSet;
+ }
+
+}
+
Propchange:
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java?rev=1434584&r1=1434583&r2=1434584&view=diff
==============================================================================
---
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
(original)
+++
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
Thu Jan 17 08:12:39 2013
@@ -20,7 +20,7 @@ import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
-import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.DefaultPollingEndpoint;
import org.apache.camel.util.UnsafeUriCharactersEncoder;
import org.springframework.jdbc.core.JdbcTemplate;
@@ -29,10 +29,15 @@ import org.springframework.jdbc.core.Jdb
* question marks (that are parameter placeholders), sharp signs should be
used.
* This is because in camel question mark has other meaning.
*/
-public class SqlEndpoint extends DefaultEndpoint {
+public class SqlEndpoint extends DefaultPollingEndpoint {
private JdbcTemplate jdbcTemplate;
private String query;
private boolean batch;
+ private int maxMessagesPerPoll;
+ private SqlProcessingStrategy processingStrategy = new
DefaultSqlProcessingStrategy();
+ private String onConsume;
+
+ // TODO: onConsumeBatchDone to execute a query when batch done
public SqlEndpoint() {
}
@@ -42,9 +47,13 @@ public class SqlEndpoint extends Default
this.jdbcTemplate = jdbcTemplate;
this.query = query;
}
-
+
public Consumer createConsumer(Processor processor) throws Exception {
- throw new UnsupportedOperationException("Not implemented");
+ SqlConsumer consumer = new SqlConsumer(this, processor, jdbcTemplate,
query);
+ consumer.setMaxMessagesPerPoll(getMaxMessagesPerPoll());
+ consumer.setOnConsume(getOnConsume());
+ configureConsumer(consumer);
+ return consumer;
}
public Producer createProducer() throws Exception {
@@ -79,6 +88,30 @@ public class SqlEndpoint extends Default
this.batch = batch;
}
+ public int getMaxMessagesPerPoll() {
+ return maxMessagesPerPoll;
+ }
+
+ public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
+ this.maxMessagesPerPoll = maxMessagesPerPoll;
+ }
+
+ public SqlProcessingStrategy getProcessingStrategy() {
+ return processingStrategy;
+ }
+
+ public void setProcessingStrategy(SqlProcessingStrategy
processingStrategy) {
+ this.processingStrategy = processingStrategy;
+ }
+
+ public String getOnConsume() {
+ return onConsume;
+ }
+
+ public void setOnConsume(String onConsume) {
+ this.onConsume = onConsume;
+ }
+
@Override
protected String createEndpointUri() {
// Make sure it's properly encoded
Added:
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java?rev=1434584&view=auto
==============================================================================
---
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java
(added)
+++
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java
Thu Jan 17 08:12:39 2013
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import org.apache.camel.Exchange;
+import org.springframework.jdbc.core.JdbcTemplate;
+
+/**
+ * Processing strategy for dealing with SQL when consuming.
+ */
+public interface SqlProcessingStrategy {
+
+ /**
+ * Commit callback if there are a query to be run after processing.
+ *
+ * @param endpoint the endpoint
+ * @param exchange The exchange after it has been processed
+ * @param data The original data delivered to the route
+ * @param jdbcTemplate The JDBC template
+ * @param query The SQL query to execute
+ * @throws Exception can be thrown in case of error
+ */
+ void commit(SqlEndpoint endpoint, Exchange exchange, Object data,
JdbcTemplate jdbcTemplate, String query) throws Exception;
+}
Propchange:
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java?rev=1434584&view=auto
==============================================================================
---
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
(added)
+++
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
Thu Jan 17 08:12:39 2013
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
+
+/**
+ *
+ */
+@Ignore
+public class SqlConsumerDeleteTest extends CamelTestSupport {
+
+ private EmbeddedDatabase db;
+
+ @Before
+ public void setUp() throws Exception {
+ db = new EmbeddedDatabaseBuilder()
+
.setType(EmbeddedDatabaseType.DERBY).addScript("sql/createAndPopulateDatabase.sql").build();
+
+ super.setUp();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ super.tearDown();
+
+ db.shutdown();
+ }
+
+ @Test
+ public void testConsume() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(3);
+
+ assertMockEndpointsSatisfied();
+
+ List<Exchange> exchanges = mock.getReceivedExchanges();
+ assertEquals(3, exchanges.size());
+
+ assertEquals(1, exchanges.get(0).getIn().getBody(Map.class).get("ID"));
+ assertEquals("Camel",
exchanges.get(0).getIn().getBody(Map.class).get("PROJECT"));
+ assertEquals(2, exchanges.get(1).getIn().getBody(Map.class).get("ID"));
+ assertEquals("AMQ",
exchanges.get(1).getIn().getBody(Map.class).get("PROJECT"));
+ assertEquals(3, exchanges.get(2).getIn().getBody(Map.class).get("ID"));
+ assertEquals("Linux",
exchanges.get(2).getIn().getBody(Map.class).get("PROJECT"));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ getContext().getComponent("sql",
SqlComponent.class).setDataSource(db);
+
+ from("sql:select * from projects order by
id?consumer.onConsume=delete from projects where id = #")
+ .to("mock:result");
+ }
+ };
+ }
+}
Propchange:
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerTest.java?rev=1434584&view=auto
==============================================================================
---
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerTest.java
(added)
+++
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerTest.java
Thu Jan 17 08:12:39 2013
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
+
+/**
+ *
+ */
+public class SqlConsumerTest extends CamelTestSupport {
+
+ private EmbeddedDatabase db;
+
+ @Before
+ public void setUp() throws Exception {
+ db = new EmbeddedDatabaseBuilder()
+
.setType(EmbeddedDatabaseType.DERBY).addScript("sql/createAndPopulateDatabase.sql").build();
+
+ super.setUp();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ super.tearDown();
+
+ db.shutdown();
+ }
+
+ @Test
+ public void testConsume() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMinimumMessageCount(3);
+
+ assertMockEndpointsSatisfied();
+
+ List<Exchange> exchanges = mock.getReceivedExchanges();
+ assertTrue(exchanges.size() >= 3);
+
+ assertEquals(1, exchanges.get(0).getIn().getBody(Map.class).get("ID"));
+ assertEquals("Camel",
exchanges.get(0).getIn().getBody(Map.class).get("PROJECT"));
+ assertEquals(2, exchanges.get(1).getIn().getBody(Map.class).get("ID"));
+ assertEquals("AMQ",
exchanges.get(1).getIn().getBody(Map.class).get("PROJECT"));
+ assertEquals(3, exchanges.get(2).getIn().getBody(Map.class).get("ID"));
+ assertEquals("Linux",
exchanges.get(2).getIn().getBody(Map.class).get("PROJECT"));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ getContext().getComponent("sql",
SqlComponent.class).setDataSource(db);
+
+ from("sql:select * from projects order by id")
+ .to("mock:result");
+ }
+ };
+ }
+}
Propchange:
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/components/camel-sql/src/test/resources/log4j.properties
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/resources/log4j.properties?rev=1434584&r1=1434583&r2=1434584&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/test/resources/log4j.properties
(original)
+++ camel/trunk/components/camel-sql/src/test/resources/log4j.properties Thu
Jan 17 08:12:39 2013
@@ -20,8 +20,9 @@
#
log4j.rootLogger=INFO, file
-#log4j.logger.org.apache.activemq=DEBUG
-#log4j.logger.org.apache.camel=DEBUG
+#log4j.logger.org.apache.camel.component.sql=DEBUG
+#log4j.logger.org.apache.camel.processor.aggregate.sql=DEBUG
+#log4j.logger.org.apache.camel.processor.idempotent.sql=DEBUG
# CONSOLE appender not used by default
log4j.appender.out=org.apache.log4j.ConsoleAppender