Author: davsclaus
Date: Thu Jan 17 14:49:29 2013
New Revision: 1434700
URL: http://svn.apache.org/viewvc?rev=1434700&view=rev
Log:
CAMEL-5643: Allow to store message body and headers as text in JDBC agg repo.
Thanks to Alan Foster for the patch.
Added:
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateStoreAsText.java
(with props)
camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringAggregateStoreAsText.xml
(with props)
camel/trunk/components/camel-sql/src/test/resources/sql/init3.sql
Modified:
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java
camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringDataSource.xml
Modified:
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java?rev=1434700&r1=1434699&r2=1434700&view=diff
==============================================================================
---
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
(original)
+++
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
Thu Jan 17 14:49:29 2013
@@ -24,7 +24,6 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-
import javax.sql.DataSource;
import org.apache.camel.CamelContext;
@@ -34,7 +33,6 @@ import org.apache.camel.support.ServiceS
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
@@ -56,6 +54,7 @@ public class JdbcAggregationRepository e
private static final transient Logger LOG =
LoggerFactory.getLogger(JdbcAggregationRepository.class);
private static final String ID = "id";
private static final String EXCHANGE = "exchange";
+ private static final String BODY = "body";
private PlatformTransactionManager transactionManager;
private DataSource dataSource;
private TransactionTemplate transactionTemplate;
@@ -69,6 +68,8 @@ public class JdbcAggregationRepository e
private boolean useRecovery = true;
private int maximumRedeliveries;
private String deadLetterUri;
+ private List<String> headersToStoreAsText;
+ private boolean storeBodyAsText;
/**
* Creates an aggregation repository
@@ -94,7 +95,7 @@ public class JdbcAggregationRepository e
public final void setTransactionManager(PlatformTransactionManager
transactionManager) {
this.transactionManager = transactionManager;
-
+
transactionTemplate = new TransactionTemplate(transactionManager);
transactionTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRED);
@@ -109,50 +110,140 @@ public class JdbcAggregationRepository e
jdbcTemplate = new JdbcTemplate(dataSource);
}
+ @Override
public Exchange add(final CamelContext camelContext, final String
correlationId, final Exchange exchange) {
return transactionTemplate.execute(new TransactionCallback<Exchange>()
{
public Exchange doInTransaction(TransactionStatus status) {
- String sql;
Exchange result = null;
final String key = correlationId;
try {
- final byte[] data = codec.marshallExchange(camelContext,
exchange);
-
LOG.debug("Adding exchange with key: [{}]", key);
- String insert = "INSERT INTO " + getRepositoryName() + "
(" + EXCHANGE + ", " + ID + ") VALUES (?, ?)";
- String update = "UPDATE " + getRepositoryName() + " SET "
+ EXCHANGE + " = ? WHERE " + ID + " = ?";
-
boolean present = jdbcTemplate.queryForInt(
"SELECT COUNT(*) FROM " + getRepositoryName() + "
WHERE " + ID + " = ?", key) != 0;
- sql = present ? update : insert;
// Recover existing exchange with that ID
if (isReturnOldExchange() && present) {
result = get(key, getRepositoryName(), camelContext);
}
- jdbcTemplate.execute(sql,
- new
AbstractLobCreatingPreparedStatementCallback(getLobHandler()) {
- @Override
- protected void setValues(PreparedStatement ps,
LobCreator lobCreator) throws SQLException {
- lobCreator.setBlobAsBytes(ps, 1, data);
- ps.setString(2, key);
- }
- });
+ if (present) {
+ update(camelContext, correlationId, exchange,
getRepositoryName());
+ } else {
+ insert(camelContext, correlationId, exchange,
getRepositoryName());
+ }
- } catch (IOException e) {
+ } catch (Exception e) {
throw new RuntimeException("Error adding to repository " +
repositoryName + " with key " + key, e);
}
return result;
}
});
+ }
+ /**
+ * Updates the current exchange details in the given repository table
+ *
+ * @param camelContext the current CamelContext
+ * @param key the correlation key
+ * @param exchange the aggregated exchange
+ * @param repositoryName The name of the table
+ * @throws Exception
+ */
+ protected void update(final CamelContext camelContext, final String key,
final Exchange exchange, String repositoryName) throws Exception {
+ StringBuilder queryBuilder = new StringBuilder()
+ .append("UPDATE ").append(repositoryName)
+ .append(" SET ")
+ .append(EXCHANGE).append(" = ?");
+ if (storeBodyAsText) {
+ queryBuilder.append(", ").append(BODY).append(" = ?");
+ }
+
+ if (hasHeadersToStoreAsText()) {
+ for (String headerName : headersToStoreAsText) {
+ queryBuilder.append(", ").append(headerName).append(" = ?");
+ }
+ }
+
+ queryBuilder.append(" WHERE ").append(ID).append(" = ?");
+
+ String sql = queryBuilder.toString();
+ insertAndUpdateHelper(camelContext, key, exchange, sql, false);
+ }
+
+ /**
+ * Inserts a new record into the given repository table
+ *
+ * @param camelContext the current CamelContext
+ * @param correlationId the correlation key
+ * @param exchange the aggregated exchange
+ * @param repositoryName The name of the table
+ * @throws Exception
+ */
+ protected void insert(final CamelContext camelContext, final String
correlationId, final Exchange exchange, String repositoryName) throws Exception
{
+ // The default totalParameterIndex is 2 for ID and Exchange. Depending
on logic this will be increased
+ int totalParameterIndex = 2;
+ StringBuilder queryBuilder = new StringBuilder()
+ .append("INSERT INTO ").append(repositoryName)
+ .append('(')
+ .append(EXCHANGE).append(", ")
+ .append(ID);
+
+ if (storeBodyAsText) {
+ queryBuilder.append(", ").append(BODY);
+ totalParameterIndex++;
+ }
+
+ if (hasHeadersToStoreAsText()) {
+ for (String headerName : headersToStoreAsText) {
+ queryBuilder.append(", ").append(headerName);
+ totalParameterIndex++;
+ }
+ }
+
+ queryBuilder.append(") VALUES (");
+
+ for (int i = 0; i < totalParameterIndex - 1; i++) {
+ queryBuilder.append("?, ");
+ }
+ queryBuilder.append("?)");
+
+ String sql = queryBuilder.toString();
+
+ insertAndUpdateHelper(camelContext, correlationId, exchange, sql,
true);
+ }
+
+ protected void insertAndUpdateHelper(final CamelContext camelContext,
final String key, final Exchange exchange, String sql, final boolean
idComesFirst) throws Exception {
+ final byte[] data = codec.marshallExchange(camelContext, exchange);
+ jdbcTemplate.execute(sql,
+ new
AbstractLobCreatingPreparedStatementCallback(getLobHandler()) {
+ @Override
+ protected void setValues(PreparedStatement ps, LobCreator
lobCreator) throws SQLException {
+ int totalParameterIndex = 0;
+ lobCreator.setBlobAsBytes(ps, ++totalParameterIndex,
data);
+ if (idComesFirst) {
+ ps.setString(++totalParameterIndex, key);
+ }
+ if (storeBodyAsText) {
+ ps.setString(++totalParameterIndex,
exchange.getIn().getBody(String.class));
+ }
+ if (hasHeadersToStoreAsText()) {
+ for (String headerName : headersToStoreAsText) {
+ String headerValue =
exchange.getIn().getHeader(headerName, String.class);
+ ps.setString(++totalParameterIndex,
headerValue);
+ }
+ }
+ if (!idComesFirst) {
+ ps.setString(++totalParameterIndex, key);
+ }
+ }
+ });
}
+ @Override
public Exchange get(final CamelContext camelContext, final String
correlationId) {
final String key = correlationId;
Exchange result = get(key, getRepositoryName(), camelContext);
@@ -183,34 +274,27 @@ public class JdbcAggregationRepository e
});
}
+ @Override
public void remove(final CamelContext camelContext, final String
correlationId, final Exchange exchange) {
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
protected void doInTransactionWithoutResult(TransactionStatus
status) {
final String key = correlationId;
final String confirmKey = exchange.getExchangeId();
try {
- final byte[] data = codec.marshallExchange(camelContext,
exchange);
-
LOG.debug("Removing key [{}]", key);
- jdbcTemplate.update("DELETE FROM " + getRepositoryName() +
" WHERE " + ID + " = ?",
- new Object[]{key});
+ jdbcTemplate.update("DELETE FROM " + getRepositoryName() +
" WHERE " + ID + " = ?", key);
+
+ insert(camelContext, confirmKey, exchange,
getRepositoryNameCompleted());
- jdbcTemplate.execute("INSERT INTO " +
getRepositoryNameCompleted() + " (" + EXCHANGE + ", " + ID + ") VALUES (?, ?)",
- new
AbstractLobCreatingPreparedStatementCallback(getLobHandler()) {
- @Override
- protected void setValues(PreparedStatement ps,
LobCreator lobCreator) throws SQLException {
- lobCreator.setBlobAsBytes(ps, 1, data);
- ps.setString(2, confirmKey);
- }
- });
- } catch (IOException e) {
+ } catch (Exception e) {
throw new RuntimeException("Error removing key " + key + "
from repository " + repositoryName, e);
}
}
});
}
+ @Override
public void confirm(final CamelContext camelContext, final String
exchangeId) {
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
protected void doInTransactionWithoutResult(TransactionStatus
status) {
@@ -224,26 +308,26 @@ public class JdbcAggregationRepository e
});
}
+ @Override
public Set<String> getKeys() {
- return transactionTemplateReadOnly.execute(new
TransactionCallback<LinkedHashSet<String>>() {
- public LinkedHashSet<String> doInTransaction(TransactionStatus
status) {
- List<String> keys = jdbcTemplate.query("SELECT " + ID + " FROM
" + getRepositoryName(),
- new RowMapper<String>() {
- public String mapRow(ResultSet rs, int rowNum)
throws SQLException {
- String id = rs.getString(ID);
- LOG.trace("getKey [{}]", id);
- return id;
- }
- });
- return new LinkedHashSet<String>(keys);
- }
- });
+ return getKeys(getRepositoryName());
}
+ @Override
public Set<String> scan(CamelContext camelContext) {
+ return getKeys(getRepositoryNameCompleted());
+ }
+
+ /**
+ * Returns the keys in the given repository
+ *
+ * @param repositoryName The name of the table
+ * @return Set of keys in the given repository name
+ */
+ protected Set<String> getKeys(final String repositoryName) {
return transactionTemplateReadOnly.execute(new
TransactionCallback<LinkedHashSet<String>>() {
public LinkedHashSet<String> doInTransaction(TransactionStatus
status) {
- List<String> keys = jdbcTemplate.query("SELECT " + ID + " FROM
" + getRepositoryNameCompleted(),
+ List<String> keys = jdbcTemplate.query("SELECT " + ID + " FROM
" + repositoryName,
new RowMapper<String>() {
public String mapRow(ResultSet rs, int rowNum)
throws SQLException {
String id = rs.getString(ID);
@@ -256,6 +340,7 @@ public class JdbcAggregationRepository e
});
}
+ @Override
public Exchange recover(CamelContext camelContext, String exchangeId) {
final String key = exchangeId;
Exchange answer = get(key, getRepositoryNameCompleted(), camelContext);
@@ -309,6 +394,22 @@ public class JdbcAggregationRepository e
this.returnOldExchange = returnOldExchange;
}
+ public void setJdbcCamelCodec(JdbcCamelCodec codec) {
+ this.codec = codec;
+ }
+
+ public boolean hasHeadersToStoreAsText() {
+ return this.headersToStoreAsText != null &&
!this.headersToStoreAsText.isEmpty();
+ }
+
+ public void setHeadersToStoreAsText(List<String> headersToStoreAsText) {
+ this.headersToStoreAsText = headersToStoreAsText;
+ }
+
+ public void setStoreBodyAsText(boolean storeBodyAsText) {
+ this.storeBodyAsText = storeBodyAsText;
+ }
+
/**
* @return the lobHandler
*/
Modified:
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java?rev=1434700&r1=1434699&r2=1434700&view=diff
==============================================================================
---
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java
(original)
+++
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java
Thu Jan 17 14:49:29 2013
@@ -31,7 +31,7 @@ import org.apache.camel.impl.DefaultExch
/**
* Adapted from HawtDBCamelCodec
*/
-public final class JdbcCamelCodec {
+public class JdbcCamelCodec {
public byte[] marshallExchange(CamelContext camelContext, Exchange
exchange) throws IOException {
// use DefaultExchangeHolder to marshal to a serialized object
Added:
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateStoreAsText.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateStoreAsText.java?rev=1434700&view=auto
==============================================================================
---
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateStoreAsText.java
(added)
+++
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateStoreAsText.java
Thu Jan 17 14:49:29 2013
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate.jdbc;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import javax.sql.DataSource;
+
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelSpringTestSupport;
+import org.junit.Test;
+import org.springframework.context.support.AbstractXmlApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+import org.springframework.jdbc.core.JdbcTemplate;
+
+/**
+ * Tests to ensure that arbitrary headers can be stored as raw text within a
dataSource
+ * Tests to ensure the body can be stored as readable text within a dataSource
+ */
+public class JdbcAggregateStoreAsText extends CamelSpringTestSupport {
+ protected JdbcAggregationRepository repo;
+ protected JdbcTemplate jdbcTemplate;
+ protected DataSource dataSource;
+
+ @Override
+ protected AbstractXmlApplicationContext createApplicationContext() {
+ return new
ClassPathXmlApplicationContext("org/apache/camel/processor/aggregate/jdbc/JdbcSpringAggregateStoreAsText.xml");
+ }
+
+ @Override
+ public void postProcessTest() throws Exception {
+ super.postProcessTest();
+
+ repo = applicationContext.getBean("repo3",
JdbcAggregationRepository.class);
+ dataSource = context.getRegistry().lookup("dataSource3",
DataSource.class);
+ jdbcTemplate = new JdbcTemplate(dataSource);
+ jdbcTemplate.afterPropertiesSet();
+ }
+
+ @Test
+ public void testStoreBodyAsTextAndCompanyNameHeaderAndAccountNameHeader()
throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:aggregated");
+ mock.expectedBodiesReceived("ABCDE");
+
+ repo.setStoreBodyAsText(true);
+ repo.setHeadersToStoreAsText(Arrays.asList("companyName",
"accountName"));
+
+ Map<String, Object> headers = new HashMap<String, Object>();
+ headers.put("id", 123);
+ headers.put("companyName", "Acme");
+ headers.put("accountName", "Alan");
+
+ template.sendBodyAndHeaders("direct:start", "A", headers);
+ assertEquals("A", getAggregationRepositoryBody(123));
+ assertEquals("Acme", getAggregationRepositoryCompanyName(123));
+ assertEquals("Alan", getAggregationRepositoryAccountName(123));
+
+ template.sendBodyAndHeaders("direct:start", "B", headers);
+ assertEquals("AB", getAggregationRepositoryBody(123));
+ assertEquals("Acme", getAggregationRepositoryCompanyName(123));
+ assertEquals("Alan", getAggregationRepositoryAccountName(123));
+
+ template.sendBodyAndHeaders("direct:start", "C", headers);
+ assertEquals("ABC", getAggregationRepositoryBody(123));
+ assertEquals("Acme", getAggregationRepositoryCompanyName(123));
+ assertEquals("Alan", getAggregationRepositoryAccountName(123));
+
+ template.sendBodyAndHeaders("direct:start", "D", headers);
+ assertEquals("ABCD", getAggregationRepositoryBody(123));
+ assertEquals("Acme", getAggregationRepositoryCompanyName(123));
+ assertEquals("Alan", getAggregationRepositoryAccountName(123));
+
+ template.sendBodyAndHeaders("direct:start", "E", headers);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Test
+ public void testStoreBodyAsTextAndNoHeaders() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:aggregated");
+ mock.expectedBodiesReceived("ABCDE");
+
+ repo.setStoreBodyAsText(true);
+ repo.setHeadersToStoreAsText(null);
+
+ Map<String, Object> headers = new HashMap<String, Object>();
+ headers.put("id", 123);
+ headers.put("companyName", "Acme");
+ headers.put("accountName", "Alan");
+
+ template.sendBodyAndHeaders("direct:start", "A", headers);
+ assertEquals("A", getAggregationRepositoryBody(123));
+ assertEquals(null, getAggregationRepositoryCompanyName(123));
+ assertEquals(null, getAggregationRepositoryAccountName(123));
+
+ template.sendBodyAndHeaders("direct:start", "B", headers);
+ assertEquals("AB", getAggregationRepositoryBody(123));
+ assertEquals(null, getAggregationRepositoryCompanyName(123));
+ assertEquals(null, getAggregationRepositoryAccountName(123));
+
+ template.sendBodyAndHeaders("direct:start", "C", headers);
+ assertEquals("ABC", getAggregationRepositoryBody(123));
+ assertEquals(null, getAggregationRepositoryCompanyName(123));
+ assertEquals(null, getAggregationRepositoryAccountName(123));
+
+ template.sendBodyAndHeaders("direct:start", "D", headers);
+ assertEquals("ABCD", getAggregationRepositoryBody(123));
+ assertEquals(null, getAggregationRepositoryCompanyName(123));
+ assertEquals(null, getAggregationRepositoryAccountName(123));
+
+ template.sendBodyAndHeaders("direct:start", "E", headers);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Test
+ public void testOnlyAccountNameHeaders() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:aggregated");
+ mock.expectedBodiesReceived("ABCDE");
+
+ repo.setStoreBodyAsText(false);
+ repo.setHeadersToStoreAsText(Arrays.asList("accountName"));
+
+ Map<String, Object> headers = new HashMap<String, Object>();
+ headers.put("id", 123);
+ headers.put("companyName", "Acme");
+ headers.put("accountName", "Alan");
+
+ template.sendBodyAndHeaders("direct:start", "A", headers);
+ assertEquals(null, getAggregationRepositoryBody(123));
+ assertEquals(null, getAggregationRepositoryCompanyName(123));
+ assertEquals("Alan", getAggregationRepositoryAccountName(123));
+
+ template.sendBodyAndHeaders("direct:start", "B", headers);
+ assertEquals(null, getAggregationRepositoryBody(123));
+ assertEquals(null, getAggregationRepositoryCompanyName(123));
+ assertEquals("Alan", getAggregationRepositoryAccountName(123));
+
+ template.sendBodyAndHeaders("direct:start", "C", headers);
+ assertEquals(null, getAggregationRepositoryBody(123));
+ assertEquals(null, getAggregationRepositoryCompanyName(123));
+ assertEquals("Alan", getAggregationRepositoryAccountName(123));
+
+ template.sendBodyAndHeaders("direct:start", "D", headers);
+ assertEquals(null, getAggregationRepositoryBody(123));
+ assertEquals(null, getAggregationRepositoryCompanyName(123));
+ assertEquals("Alan", getAggregationRepositoryAccountName(123));
+
+ template.sendBodyAndHeaders("direct:start", "E", headers);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public String getAggregationRepositoryBody(int id) throws Exception {
+ return getAggregationRepositoryColumn(id, "body");
+ }
+
+ public String getAggregationRepositoryCompanyName(int id) throws Exception
{
+ return getAggregationRepositoryColumn(id, "companyName");
+ }
+
+ public String getAggregationRepositoryAccountName(int id) throws Exception
{
+ return getAggregationRepositoryColumn(id, "accountName");
+ }
+
+ public String getAggregationRepositoryColumn(int id, String columnName) {
+ return jdbcTemplate.queryForObject("SELECT " + columnName + " from
aggregationRepo3 where id = ?", String.class, id);
+ }
+}
\ No newline at end of file
Propchange:
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateStoreAsText.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateStoreAsText.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringAggregateStoreAsText.xml
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringAggregateStoreAsText.xml?rev=1434700&view=auto
==============================================================================
---
camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringAggregateStoreAsText.xml
(added)
+++
camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringAggregateStoreAsText.xml
Thu Jan 17 14:49:29 2013
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd">
+
+ <import resource="JdbcSpringDataSource.xml" />
+
+ <!-- aggregate the messages using this strategy -->
+ <bean id="myAggregatorStrategy"
class="org.apache.camel.processor.aggregate.jdbc.AbstractJdbcAggregationTestSupport$MyAggregationStrategy"/>
+
+ <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"
trace="true">
+ <route>
+ <from uri="direct:start"/>
+ <!-- aggregate using our strategy and jdbc repo, and complete when
we have 5 messages aggregated -->
+ <aggregate strategyRef="myAggregatorStrategy"
aggregationRepositoryRef="repo3" completionSize="5">
+ <!-- correlate by header with the key id -->
+
<correlationExpression><header>id</header></correlationExpression>
+ <!-- send aggregated messages to the mock endpoint -->
+ <to uri="mock:aggregated"/>
+ </aggregate>
+ </route>
+ </camelContext>
+
+</beans>
\ No newline at end of file
Propchange:
camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringAggregateStoreAsText.xml
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringAggregateStoreAsText.xml
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange:
camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringAggregateStoreAsText.xml
------------------------------------------------------------------------------
svn:mime-type = text/xml
Modified:
camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringDataSource.xml
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringDataSource.xml?rev=1434700&r1=1434699&r2=1434700&view=diff
==============================================================================
---
camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringDataSource.xml
(original)
+++
camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringDataSource.xml
Thu Jan 17 14:49:29 2013
@@ -44,6 +44,11 @@
<jdbc:script location="classpath:/sql/init2.sql"/>
</jdbc:embedded-database>
+ <!-- In Memory Database #3 -->
+ <jdbc:embedded-database id="dataSource3" type="DERBY">
+ <jdbc:script location="classpath:/sql/init3.sql"/>
+ </jdbc:embedded-database>
+
<bean id="repo1"
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
<property name="repositoryName" value="aggregationRepo1" />
<property name="transactionManager" ref="txManager1" />
@@ -56,12 +61,32 @@
<property name="dataSource" ref="dataSource2" />
</bean>
+ <!-- START SNIPPET: e3 -->
+ <bean id="repo3"
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
+ <property name="repositoryName" value="aggregationRepo3"/>
+ <property name="transactionManager" ref="txManager3"/>
+ <property name="dataSource" ref="dataSource3"/>
+ <!-- configure to store the message body and following headers as text
in the repo -->
+ <property name="storeBodyAsText" value="true"/>
+ <property name="headersToStoreAsText">
+ <list>
+ <value>companyName</value>
+ <value>accountName</value>
+ </list>
+ </property>
+ </bean>
+ <!-- END SNIPPET: e3 -->
+
<bean id="txManager1"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource1"/>
</bean>
- <bean id="txManager2"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
+ <bean id="txManager2"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource2"/>
</bean>
+ <bean id="txManager3"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
+ <property name="dataSource" ref="dataSource3"/>
+ </bean>
+
</beans>
Added: camel/trunk/components/camel-sql/src/test/resources/sql/init3.sql
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/resources/sql/init3.sql?rev=1434700&view=auto
==============================================================================
--- camel/trunk/components/camel-sql/src/test/resources/sql/init3.sql (added)
+++ camel/trunk/components/camel-sql/src/test/resources/sql/init3.sql Thu Jan
17 14:49:29 2013
@@ -0,0 +1,33 @@
+-- ------------------------------------------------------------------------
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+-- ------------------------------------------------------------------------
+
+CREATE TABLE aggregationRepo3 (
+ id varchar(255) NOT NULL,
+ exchange blob NOT NULL,
+ body varchar(1000),
+ companyName varchar(1000),
+ accountName varchar(1000),
+ constraint aggregationRepo3_pk PRIMARY KEY (id)
+);
+CREATE TABLE aggregationRepo3_completed (
+ id varchar(255) NOT NULL,
+ exchange blob NOT NULL,
+ body varchar(1000),
+ companyName varchar(1000),
+ accountName varchar(1000),
+ constraint aggregationRepo3_completed_pk PRIMARY KEY (id)
+);
\ No newline at end of file