Author: davsclaus
Date: Thu Jan 17 14:49:29 2013
New Revision: 1434700

URL: http://svn.apache.org/viewvc?rev=1434700&view=rev
Log:
CAMEL-5643: Allow to store message body and headers as text in JDBC agg repo. 
Thanks to Alan Foster for the patch.

Added:
    
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateStoreAsText.java
   (with props)
    
camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringAggregateStoreAsText.xml
   (with props)
    camel/trunk/components/camel-sql/src/test/resources/sql/init3.sql
Modified:
    
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
    
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java
    
camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringDataSource.xml

Modified: 
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java?rev=1434700&r1=1434699&r2=1434700&view=diff
==============================================================================
--- 
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
 (original)
+++ 
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
 Thu Jan 17 14:49:29 2013
@@ -24,7 +24,6 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-
 import javax.sql.DataSource;
 
 import org.apache.camel.CamelContext;
@@ -34,7 +33,6 @@ import org.apache.camel.support.ServiceS
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.springframework.dao.EmptyResultDataAccessException;
 import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.jdbc.core.RowMapper;
@@ -56,6 +54,7 @@ public class JdbcAggregationRepository e
     private static final transient Logger LOG = 
LoggerFactory.getLogger(JdbcAggregationRepository.class);
     private static final String ID = "id";
     private static final String EXCHANGE = "exchange";
+    private static final String BODY = "body";
     private PlatformTransactionManager transactionManager;
     private DataSource dataSource;
     private TransactionTemplate transactionTemplate;
@@ -69,6 +68,8 @@ public class JdbcAggregationRepository e
     private boolean useRecovery = true;
     private int maximumRedeliveries;
     private String deadLetterUri;
+    private List<String> headersToStoreAsText;
+    private boolean storeBodyAsText;
 
     /**
      * Creates an aggregation repository
@@ -94,7 +95,7 @@ public class JdbcAggregationRepository e
 
     public final void setTransactionManager(PlatformTransactionManager 
transactionManager) {
         this.transactionManager = transactionManager;
-        
+
         transactionTemplate = new TransactionTemplate(transactionManager);
         
transactionTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRED);
 
@@ -109,50 +110,140 @@ public class JdbcAggregationRepository e
         jdbcTemplate = new JdbcTemplate(dataSource);
     }
 
+    @Override
     public Exchange add(final CamelContext camelContext, final String 
correlationId, final Exchange exchange) {
         return transactionTemplate.execute(new TransactionCallback<Exchange>() 
{
 
             public Exchange doInTransaction(TransactionStatus status) {
-                String sql;
                 Exchange result = null;
                 final String key = correlationId;
 
                 try {
-                    final byte[] data = codec.marshallExchange(camelContext, 
exchange);
-
                     LOG.debug("Adding exchange with key: [{}]", key);
 
-                    String insert = "INSERT INTO " + getRepositoryName() + " 
(" + EXCHANGE + ", " + ID + ") VALUES (?, ?)";
-                    String update = "UPDATE " + getRepositoryName() + " SET " 
+ EXCHANGE + " = ? WHERE " + ID + " = ?";
-
                     boolean present = jdbcTemplate.queryForInt(
                             "SELECT COUNT(*) FROM " + getRepositoryName() + " 
WHERE " + ID + " = ?", key) != 0;
-                    sql = present ? update : insert;
 
                     // Recover existing exchange with that ID
                     if (isReturnOldExchange() && present) {
                         result = get(key, getRepositoryName(), camelContext);
                     }
 
-                    jdbcTemplate.execute(sql,
-                            new 
AbstractLobCreatingPreparedStatementCallback(getLobHandler()) {
-                                @Override
-                                protected void setValues(PreparedStatement ps, 
LobCreator lobCreator) throws SQLException {
-                                    lobCreator.setBlobAsBytes(ps, 1, data);
-                                    ps.setString(2, key);
-                                }
-                            });
+                    if (present) {
+                        update(camelContext, correlationId, exchange, 
getRepositoryName());
+                    } else {
+                        insert(camelContext, correlationId, exchange, 
getRepositoryName());
+                    }
 
-                } catch (IOException e) {
+                } catch (Exception e) {
                     throw new RuntimeException("Error adding to repository " + 
repositoryName + " with key " + key, e);
                 }
 
                 return result;
             }
         });
+    }
 
+    /**
+     * Updates the current exchange details in the given repository table
+     *
+     * @param camelContext   the current CamelContext
+     * @param key            the correlation key
+     * @param exchange       the aggregated exchange
+     * @param repositoryName The name of the table
+     * @throws Exception
+     */
+    protected void update(final CamelContext camelContext, final String key, 
final Exchange exchange, String repositoryName) throws Exception {
+        StringBuilder queryBuilder = new StringBuilder()
+                .append("UPDATE ").append(repositoryName)
+                .append(" SET ")
+                .append(EXCHANGE).append(" = ?");
+        if (storeBodyAsText) {
+            queryBuilder.append(", ").append(BODY).append(" = ?");
+        }
+
+        if (hasHeadersToStoreAsText()) {
+            for (String headerName : headersToStoreAsText) {
+                queryBuilder.append(", ").append(headerName).append(" = ?");
+            }
+        }
+
+        queryBuilder.append(" WHERE ").append(ID).append(" = ?");
+
+        String sql = queryBuilder.toString();
+        insertAndUpdateHelper(camelContext, key, exchange, sql, false);
+    }
+
+    /**
+     * Inserts a new record into the given repository table
+     *
+     * @param camelContext   the current CamelContext
+     * @param correlationId  the correlation key
+     * @param exchange       the aggregated exchange
+     * @param repositoryName The name of the table
+     * @throws Exception
+     */
+    protected void insert(final CamelContext camelContext, final String 
correlationId, final Exchange exchange, String repositoryName) throws Exception 
{
+        // The default totalParameterIndex is 2 for ID and Exchange. Depending 
on logic this will be increased
+        int totalParameterIndex = 2;
+        StringBuilder queryBuilder = new StringBuilder()
+                .append("INSERT INTO ").append(repositoryName)
+                .append('(')
+                .append(EXCHANGE).append(", ")
+                .append(ID);
+
+        if (storeBodyAsText) {
+            queryBuilder.append(", ").append(BODY);
+            totalParameterIndex++;
+        }
+
+        if (hasHeadersToStoreAsText()) {
+            for (String headerName : headersToStoreAsText) {
+                queryBuilder.append(", ").append(headerName);
+                totalParameterIndex++;
+            }
+        }
+
+        queryBuilder.append(") VALUES (");
+
+        for (int i = 0; i < totalParameterIndex - 1; i++) {
+            queryBuilder.append("?, ");
+        }
+        queryBuilder.append("?)");
+
+        String sql = queryBuilder.toString();
+
+        insertAndUpdateHelper(camelContext, correlationId, exchange, sql, 
true);
+    }
+
+    protected void insertAndUpdateHelper(final CamelContext camelContext, 
final String key, final Exchange exchange, String sql, final boolean 
idComesFirst) throws Exception {
+        final byte[] data = codec.marshallExchange(camelContext, exchange);
+        jdbcTemplate.execute(sql,
+                new 
AbstractLobCreatingPreparedStatementCallback(getLobHandler()) {
+                    @Override
+                    protected void setValues(PreparedStatement ps, LobCreator 
lobCreator) throws SQLException {
+                        int totalParameterIndex = 0;
+                        lobCreator.setBlobAsBytes(ps, ++totalParameterIndex, 
data);
+                        if (idComesFirst) {
+                            ps.setString(++totalParameterIndex, key);
+                        }
+                        if (storeBodyAsText) {
+                            ps.setString(++totalParameterIndex, 
exchange.getIn().getBody(String.class));
+                        }
+                        if (hasHeadersToStoreAsText()) {
+                            for (String headerName : headersToStoreAsText) {
+                                String headerValue = 
exchange.getIn().getHeader(headerName, String.class);
+                                ps.setString(++totalParameterIndex, 
headerValue);
+                            }
+                        }
+                        if (!idComesFirst) {
+                            ps.setString(++totalParameterIndex, key);
+                        }
+                    }
+                });
     }
 
+    @Override
     public Exchange get(final CamelContext camelContext, final String 
correlationId) {
         final String key = correlationId;
         Exchange result = get(key, getRepositoryName(), camelContext);
@@ -183,34 +274,27 @@ public class JdbcAggregationRepository e
         });
     }
 
+    @Override
     public void remove(final CamelContext camelContext, final String 
correlationId, final Exchange exchange) {
         transactionTemplate.execute(new TransactionCallbackWithoutResult() {
             protected void doInTransactionWithoutResult(TransactionStatus 
status) {
                 final String key = correlationId;
                 final String confirmKey = exchange.getExchangeId();
                 try {
-                    final byte[] data = codec.marshallExchange(camelContext, 
exchange);
-
                     LOG.debug("Removing key [{}]", key);
 
-                    jdbcTemplate.update("DELETE FROM " + getRepositoryName() + 
" WHERE " + ID + " = ?",
-                            new Object[]{key});
+                    jdbcTemplate.update("DELETE FROM " + getRepositoryName() + 
" WHERE " + ID + " = ?", key);
+
+                    insert(camelContext, confirmKey, exchange, 
getRepositoryNameCompleted());
 
-                    jdbcTemplate.execute("INSERT INTO " + 
getRepositoryNameCompleted() + " (" + EXCHANGE + ", " + ID + ") VALUES (?, ?)",
-                            new 
AbstractLobCreatingPreparedStatementCallback(getLobHandler()) {
-                                @Override
-                                protected void setValues(PreparedStatement ps, 
LobCreator lobCreator) throws SQLException {
-                                    lobCreator.setBlobAsBytes(ps, 1, data);
-                                    ps.setString(2, confirmKey);
-                                }
-                            });
-                } catch (IOException e) {
+                } catch (Exception e) {
                     throw new RuntimeException("Error removing key " + key + " 
from repository " + repositoryName, e);
                 }
             }
         });
     }
 
+    @Override
     public void confirm(final CamelContext camelContext, final String 
exchangeId) {
         transactionTemplate.execute(new TransactionCallbackWithoutResult() {
             protected void doInTransactionWithoutResult(TransactionStatus 
status) {
@@ -224,26 +308,26 @@ public class JdbcAggregationRepository e
         });
     }
 
+    @Override
     public Set<String> getKeys() {
-        return transactionTemplateReadOnly.execute(new 
TransactionCallback<LinkedHashSet<String>>() {
-            public LinkedHashSet<String> doInTransaction(TransactionStatus 
status) {
-                List<String> keys = jdbcTemplate.query("SELECT " + ID + " FROM 
" + getRepositoryName(),
-                        new RowMapper<String>() {
-                            public String mapRow(ResultSet rs, int rowNum) 
throws SQLException {
-                                String id = rs.getString(ID);
-                                LOG.trace("getKey [{}]", id);
-                                return id;
-                            }
-                        });
-                return new LinkedHashSet<String>(keys);
-            }
-        });
+        return getKeys(getRepositoryName());
     }
 
+    @Override
     public Set<String> scan(CamelContext camelContext) {
+        return getKeys(getRepositoryNameCompleted());
+    }
+
+    /**
+     * Returns the keys in the given repository
+     *
+     * @param repositoryName The name of the table
+     * @return Set of keys in the given repository name
+     */
+    protected Set<String> getKeys(final String repositoryName) {
         return transactionTemplateReadOnly.execute(new 
TransactionCallback<LinkedHashSet<String>>() {
             public LinkedHashSet<String> doInTransaction(TransactionStatus 
status) {
-                List<String> keys = jdbcTemplate.query("SELECT " + ID + " FROM 
" + getRepositoryNameCompleted(),
+                List<String> keys = jdbcTemplate.query("SELECT " + ID + " FROM 
" + repositoryName,
                         new RowMapper<String>() {
                             public String mapRow(ResultSet rs, int rowNum) 
throws SQLException {
                                 String id = rs.getString(ID);
@@ -256,6 +340,7 @@ public class JdbcAggregationRepository e
         });
     }
 
+    @Override
     public Exchange recover(CamelContext camelContext, String exchangeId) {
         final String key = exchangeId;
         Exchange answer = get(key, getRepositoryNameCompleted(), camelContext);
@@ -309,6 +394,22 @@ public class JdbcAggregationRepository e
         this.returnOldExchange = returnOldExchange;
     }
 
+    public void setJdbcCamelCodec(JdbcCamelCodec codec) {
+        this.codec = codec;
+    }
+
+    public boolean hasHeadersToStoreAsText() {
+        return this.headersToStoreAsText != null && 
!this.headersToStoreAsText.isEmpty();
+    }
+
+    public void setHeadersToStoreAsText(List<String> headersToStoreAsText) {
+        this.headersToStoreAsText = headersToStoreAsText;
+    }
+
+    public void setStoreBodyAsText(boolean storeBodyAsText) {
+        this.storeBodyAsText = storeBodyAsText;
+    }
+
     /**
      * @return the lobHandler
      */

Modified: 
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java?rev=1434700&r1=1434699&r2=1434700&view=diff
==============================================================================
--- 
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java
 (original)
+++ 
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java
 Thu Jan 17 14:49:29 2013
@@ -31,7 +31,7 @@ import org.apache.camel.impl.DefaultExch
 /**
  * Adapted from HawtDBCamelCodec
  */
-public final class JdbcCamelCodec {
+public class JdbcCamelCodec {
 
     public byte[] marshallExchange(CamelContext camelContext, Exchange 
exchange) throws IOException {
         // use DefaultExchangeHolder to marshal to a serialized object

Added: 
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateStoreAsText.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateStoreAsText.java?rev=1434700&view=auto
==============================================================================
--- 
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateStoreAsText.java
 (added)
+++ 
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateStoreAsText.java
 Thu Jan 17 14:49:29 2013
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate.jdbc;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import javax.sql.DataSource;
+
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelSpringTestSupport;
+import org.junit.Test;
+import org.springframework.context.support.AbstractXmlApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+import org.springframework.jdbc.core.JdbcTemplate;
+
+/**
+ * Tests to ensure that arbitrary headers can be stored as raw text within a 
dataSource
+ * Tests to ensure the body can be stored as readable text within a dataSource
+ */
+public class JdbcAggregateStoreAsText extends CamelSpringTestSupport {
+    protected JdbcAggregationRepository repo;
+    protected JdbcTemplate jdbcTemplate;
+    protected DataSource dataSource;
+
+    @Override
+    protected AbstractXmlApplicationContext createApplicationContext() {
+        return new 
ClassPathXmlApplicationContext("org/apache/camel/processor/aggregate/jdbc/JdbcSpringAggregateStoreAsText.xml");
+    }
+
+    @Override
+    public void postProcessTest() throws Exception {
+        super.postProcessTest();
+
+        repo = applicationContext.getBean("repo3", 
JdbcAggregationRepository.class);
+        dataSource = context.getRegistry().lookup("dataSource3", 
DataSource.class);
+        jdbcTemplate = new JdbcTemplate(dataSource);
+        jdbcTemplate.afterPropertiesSet();
+    }
+
+    @Test
+    public void testStoreBodyAsTextAndCompanyNameHeaderAndAccountNameHeader() 
throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:aggregated");
+        mock.expectedBodiesReceived("ABCDE");
+
+        repo.setStoreBodyAsText(true);
+        repo.setHeadersToStoreAsText(Arrays.asList("companyName", 
"accountName"));
+
+        Map<String, Object> headers = new HashMap<String, Object>();
+        headers.put("id", 123);
+        headers.put("companyName", "Acme");
+        headers.put("accountName", "Alan");
+
+        template.sendBodyAndHeaders("direct:start", "A", headers);
+        assertEquals("A", getAggregationRepositoryBody(123));
+        assertEquals("Acme", getAggregationRepositoryCompanyName(123));
+        assertEquals("Alan", getAggregationRepositoryAccountName(123));
+
+        template.sendBodyAndHeaders("direct:start", "B", headers);
+        assertEquals("AB", getAggregationRepositoryBody(123));
+        assertEquals("Acme", getAggregationRepositoryCompanyName(123));
+        assertEquals("Alan", getAggregationRepositoryAccountName(123));
+
+        template.sendBodyAndHeaders("direct:start", "C", headers);
+        assertEquals("ABC", getAggregationRepositoryBody(123));
+        assertEquals("Acme", getAggregationRepositoryCompanyName(123));
+        assertEquals("Alan", getAggregationRepositoryAccountName(123));
+
+        template.sendBodyAndHeaders("direct:start", "D", headers);
+        assertEquals("ABCD", getAggregationRepositoryBody(123));
+        assertEquals("Acme", getAggregationRepositoryCompanyName(123));
+        assertEquals("Alan", getAggregationRepositoryAccountName(123));
+
+        template.sendBodyAndHeaders("direct:start", "E", headers);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testStoreBodyAsTextAndNoHeaders() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:aggregated");
+        mock.expectedBodiesReceived("ABCDE");
+
+        repo.setStoreBodyAsText(true);
+        repo.setHeadersToStoreAsText(null);
+
+        Map<String, Object> headers = new HashMap<String, Object>();
+        headers.put("id", 123);
+        headers.put("companyName", "Acme");
+        headers.put("accountName", "Alan");
+
+        template.sendBodyAndHeaders("direct:start", "A", headers);
+        assertEquals("A", getAggregationRepositoryBody(123));
+        assertEquals(null, getAggregationRepositoryCompanyName(123));
+        assertEquals(null, getAggregationRepositoryAccountName(123));
+
+        template.sendBodyAndHeaders("direct:start", "B", headers);
+        assertEquals("AB", getAggregationRepositoryBody(123));
+        assertEquals(null, getAggregationRepositoryCompanyName(123));
+        assertEquals(null, getAggregationRepositoryAccountName(123));
+
+        template.sendBodyAndHeaders("direct:start", "C", headers);
+        assertEquals("ABC", getAggregationRepositoryBody(123));
+        assertEquals(null, getAggregationRepositoryCompanyName(123));
+        assertEquals(null, getAggregationRepositoryAccountName(123));
+
+        template.sendBodyAndHeaders("direct:start", "D", headers);
+        assertEquals("ABCD", getAggregationRepositoryBody(123));
+        assertEquals(null, getAggregationRepositoryCompanyName(123));
+        assertEquals(null, getAggregationRepositoryAccountName(123));
+
+        template.sendBodyAndHeaders("direct:start", "E", headers);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testOnlyAccountNameHeaders() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:aggregated");
+        mock.expectedBodiesReceived("ABCDE");
+
+        repo.setStoreBodyAsText(false);
+        repo.setHeadersToStoreAsText(Arrays.asList("accountName"));
+
+        Map<String, Object> headers = new HashMap<String, Object>();
+        headers.put("id", 123);
+        headers.put("companyName", "Acme");
+        headers.put("accountName", "Alan");
+
+        template.sendBodyAndHeaders("direct:start", "A", headers);
+        assertEquals(null, getAggregationRepositoryBody(123));
+        assertEquals(null, getAggregationRepositoryCompanyName(123));
+        assertEquals("Alan", getAggregationRepositoryAccountName(123));
+
+        template.sendBodyAndHeaders("direct:start", "B", headers);
+        assertEquals(null, getAggregationRepositoryBody(123));
+        assertEquals(null, getAggregationRepositoryCompanyName(123));
+        assertEquals("Alan", getAggregationRepositoryAccountName(123));
+
+        template.sendBodyAndHeaders("direct:start", "C", headers);
+        assertEquals(null, getAggregationRepositoryBody(123));
+        assertEquals(null, getAggregationRepositoryCompanyName(123));
+        assertEquals("Alan", getAggregationRepositoryAccountName(123));
+
+        template.sendBodyAndHeaders("direct:start", "D", headers);
+        assertEquals(null, getAggregationRepositoryBody(123));
+        assertEquals(null, getAggregationRepositoryCompanyName(123));
+        assertEquals("Alan", getAggregationRepositoryAccountName(123));
+
+        template.sendBodyAndHeaders("direct:start", "E", headers);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public String getAggregationRepositoryBody(int id) throws Exception {
+        return getAggregationRepositoryColumn(id, "body");
+    }
+
+    public String getAggregationRepositoryCompanyName(int id) throws Exception 
{
+        return getAggregationRepositoryColumn(id, "companyName");
+    }
+
+    public String getAggregationRepositoryAccountName(int id) throws Exception 
{
+        return getAggregationRepositoryColumn(id, "accountName");
+    }
+
+    public String getAggregationRepositoryColumn(int id, String columnName) {
+        return jdbcTemplate.queryForObject("SELECT " + columnName + " from 
aggregationRepo3 where id = ?", String.class, id);
+    }
+}
\ No newline at end of file

Propchange: 
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateStoreAsText.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateStoreAsText.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: 
camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringAggregateStoreAsText.xml
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringAggregateStoreAsText.xml?rev=1434700&view=auto
==============================================================================
--- 
camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringAggregateStoreAsText.xml
 (added)
+++ 
camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringAggregateStoreAsText.xml
 Thu Jan 17 14:49:29 2013
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="
+       http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd
+       http://camel.apache.org/schema/spring 
http://camel.apache.org/schema/spring/camel-spring.xsd";>
+
+    <import resource="JdbcSpringDataSource.xml" />
+
+    <!-- aggregate the messages using this strategy -->
+    <bean id="myAggregatorStrategy" 
class="org.apache.camel.processor.aggregate.jdbc.AbstractJdbcAggregationTestSupport$MyAggregationStrategy"/>
+
+    <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"; 
trace="true">
+        <route>
+            <from uri="direct:start"/>
+            <!-- aggregate using our strategy and jdbc repo, and complete when 
we have 5 messages aggregated -->
+            <aggregate strategyRef="myAggregatorStrategy" 
aggregationRepositoryRef="repo3" completionSize="5">
+                <!-- correlate by header with the key id -->
+                
<correlationExpression><header>id</header></correlationExpression>
+                <!-- send aggregated messages to the mock endpoint -->
+                <to uri="mock:aggregated"/>
+            </aggregate>
+        </route>
+    </camelContext>
+
+</beans>
\ No newline at end of file

Propchange: 
camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringAggregateStoreAsText.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringAggregateStoreAsText.xml
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: 
camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringAggregateStoreAsText.xml
------------------------------------------------------------------------------
    svn:mime-type = text/xml

Modified: 
camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringDataSource.xml
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringDataSource.xml?rev=1434700&r1=1434699&r2=1434700&view=diff
==============================================================================
--- 
camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringDataSource.xml
 (original)
+++ 
camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringDataSource.xml
 Thu Jan 17 14:49:29 2013
@@ -44,6 +44,11 @@
         <jdbc:script location="classpath:/sql/init2.sql"/>
     </jdbc:embedded-database>
 
+    <!-- In Memory Database #3 -->
+    <jdbc:embedded-database id="dataSource3" type="DERBY">
+      <jdbc:script location="classpath:/sql/init3.sql"/>
+    </jdbc:embedded-database>
+
     <bean id="repo1" 
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
         <property name="repositoryName" value="aggregationRepo1" />
         <property name="transactionManager" ref="txManager1" />
@@ -56,12 +61,32 @@
         <property name="dataSource" ref="dataSource2" />
     </bean>
 
+    <!-- START SNIPPET: e3 -->
+    <bean id="repo3" 
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
+        <property name="repositoryName" value="aggregationRepo3"/>
+        <property name="transactionManager" ref="txManager3"/>
+        <property name="dataSource" ref="dataSource3"/>
+        <!-- configure to store the message body and following headers as text 
in the repo -->
+        <property name="storeBodyAsText" value="true"/>
+        <property name="headersToStoreAsText">
+               <list>
+                       <value>companyName</value>
+                   <value>accountName</value>
+               </list>
+        </property>
+    </bean>
+    <!-- END SNIPPET: e3 -->
+
     <bean id="txManager1" 
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
       <property name="dataSource" ref="dataSource1"/>
     </bean>
 
-       <bean id="txManager2" 
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
+         <bean id="txManager2" 
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
       <property name="dataSource" ref="dataSource2"/>
     </bean>
 
+         <bean id="txManager3" 
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
+      <property name="dataSource" ref="dataSource3"/>
+    </bean>
+
 </beans>

Added: camel/trunk/components/camel-sql/src/test/resources/sql/init3.sql
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/resources/sql/init3.sql?rev=1434700&view=auto
==============================================================================
--- camel/trunk/components/camel-sql/src/test/resources/sql/init3.sql (added)
+++ camel/trunk/components/camel-sql/src/test/resources/sql/init3.sql Thu Jan 
17 14:49:29 2013
@@ -0,0 +1,33 @@
+-- ------------------------------------------------------------------------
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+-- ------------------------------------------------------------------------
+
+CREATE TABLE aggregationRepo3 (
+    id varchar(255) NOT NULL,
+    exchange blob NOT NULL,
+    body varchar(1000),
+    companyName varchar(1000),
+    accountName varchar(1000),
+    constraint aggregationRepo3_pk PRIMARY KEY (id)
+);
+CREATE TABLE aggregationRepo3_completed (
+    id varchar(255) NOT NULL,
+    exchange blob NOT NULL,
+    body varchar(1000),
+    companyName varchar(1000),
+    accountName varchar(1000),
+    constraint aggregationRepo3_completed_pk PRIMARY KEY (id)
+);
\ No newline at end of file


Reply via email to