This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-spring-boot.git


The following commit(s) were added to refs/heads/main by this push:
     new c146987  CAMEL-17800 add tests in camel-sql-starter (#472)
c146987 is described below

commit c146987cd321204e2492ffc2b739b5422db2e165
Author: JiriOndrusek <ondrusek.j...@gmail.com>
AuthorDate: Sat Mar 19 09:02:06 2022 +0100

    CAMEL-17800 add tests in camel-sql-starter (#472)
---
 components-starter/camel-sql-starter/pom.xml       |  15 ++
 .../org/apache/camel/component/sql/BaseSql.java    |  70 +++++++++
 .../sql/DataSourceAutoConfigurationTest.java       |  72 ---------
 .../sql/SqlConsumerDynamicParameterTest.java       | 114 ++++++++++++++
 .../sql/SqlEndpointMisconfigureDataSourceTest.java | 104 +++++++++++++
 .../camel/component/sql/SqlGeneratedKeysTest.java  | 106 +++++++++++++
 .../sql/SqlProducerExpressionParameterTest.java    |  94 ++++++++++++
 .../camel/component/sql/SqlProducerInTest.java     |  93 ++++++++++++
 .../component/sql/SqlProducerOutputHeaderTest.java |  83 ++++++++++
 .../sql/SqlProducerOutputTypeStreamListTest.java   |  96 ++++++++++++
 .../camel/component/sql/SqlProducerToDTest.java    |  92 +++++++++++
 .../sql/SqlProducerUpdateHeadersTest.java          |  82 ++++++++++
 .../sql/SqlProducerUseMessageBodyForSqlTest.java   | 153 +++++++++++++++++++
 .../component/sql/SqlTransactedRouteTest.java      | 168 +++++++++++++++++++++
 .../camel/component/sql/aggregation/HeaderDto.java |  60 ++++++++
 .../JdbcAggregateRecoverDeadLetterChannelTest.java | 130 ++++++++++++++++
 .../JdbcAggregateSerializedHeadersTest.java        | 128 ++++++++++++++++
 .../aggregation/JdbcAggregateStoreAsTextTest.java  | 161 ++++++++++++++++++++
 .../sql/aggregation/MyAggregationStrategy.java     |  19 +++
 .../CustomizedJdbcMessageIdRepositoryTest.java     | 131 ++++++++++++++++
 .../JdbcCachedMessageIdRepositoryTest.java         | 128 ++++++++++++++++
 .../idempotent/JdbcMessageIdRepositoryTest.java    | 126 ++++++++++++++++
 .../component/sql/support/DummyJDBCDriver.java     |  66 --------
 ...dbcOrphanLockAwareIdempotentRepositoryTest.java | 106 +++++++++++++
 .../resources/sql/createAndPopulateDatabase.sql    |  23 +++
 .../resources/sql/createAndPopulateDatabase3.sql   |  27 ++++
 .../sql/idempotentWithOrphanLockRemoval.sql        |  14 ++
 .../src/test/resources/sql/init.sql                |  29 ++++
 .../src/test/resources/sql/init3.sql               |  35 +++++
 .../src/test/resources/sql/selectProjectsIn.sql    |  22 +++
 30 files changed, 2409 insertions(+), 138 deletions(-)

diff --git a/components-starter/camel-sql-starter/pom.xml 
b/components-starter/camel-sql-starter/pom.xml
index 58bbc46..54be447 100644
--- a/components-starter/camel-sql-starter/pom.xml
+++ b/components-starter/camel-sql-starter/pom.xml
@@ -45,6 +45,21 @@
       <version>${spring-boot-version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.derby</groupId>
+      <artifactId>derby</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.hsqldb</groupId>
+      <artifactId>hsqldb</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+      <scope>test</scope>
+    </dependency>
     <!--START OF GENERATED CODE-->
     <dependency>
       <groupId>org.apache.camel.springboot</groupId>
diff --git 
a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/BaseSql.java
 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/BaseSql.java
new file mode 100644
index 0000000..22e528c
--- /dev/null
+++ 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/BaseSql.java
@@ -0,0 +1,70 @@
+package org.apache.camel.component.sql;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Configuration;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
+
+import javax.sql.DataSource;
+
+public class BaseSql {
+
+    @Autowired
+    protected CamelContext context;
+
+    @Autowired
+    protected ProducerTemplate template;
+
+    protected static EmbeddedDatabase initEmptyDb() {
+        return initDb(null);
+    }
+
+    protected static EmbeddedDatabase initDb() {
+        return initDb("sql/createAndPopulateDatabase.sql");
+    }
+
+    protected static EmbeddedDatabase initDb(String script) {
+        return initDb(EmbeddedDatabaseType.DERBY, script);
+    }
+
+    protected static EmbeddedDatabase initDb(EmbeddedDatabaseType type, String 
script) {
+        EmbeddedDatabaseBuilder builder = new EmbeddedDatabaseBuilder()
+                .setName(BaseSql.class.getSimpleName())
+                .setType(type);
+        if(script != null && !"".equals(script)) {
+            builder.addScript(script);
+        }
+        EmbeddedDatabase ed =  builder.build();
+        return ed;
+    }
+
+    protected void assertMockEndpointsSatisfied() throws InterruptedException {
+        MockEndpoint.assertIsSatisfied(this.context);
+    }
+
+    // *************************************
+    // Config
+    // *************************************
+
+    @Configuration
+    public static class TestConfiguration {
+
+        @Bean
+        public DisposableBean embeddedDatabaseShutdownExecutor(DataSource 
dataSource) {
+            return new DisposableBean() {
+
+                @Override
+                public void destroy() throws Exception {
+                    ((EmbeddedDatabase)dataSource).shutdown();
+                }
+
+            };
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/DataSourceAutoConfigurationTest.java
 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/DataSourceAutoConfigurationTest.java
deleted file mode 100644
index 06256a2..0000000
--- 
a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/DataSourceAutoConfigurationTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.sql;
-
-import javax.sql.DataSource;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.component.sql.stored.SqlStoredComponent;
-import org.apache.camel.component.sql.stored.SqlStoredEndpoint;
-import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
-import org.junit.jupiter.api.Test;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.annotation.DirtiesContext;
-import org.springframework.test.context.ContextConfiguration;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-
-@CamelSpringBootTest
-@SpringBootApplication
-@DirtiesContext
-@ContextConfiguration(classes = DataSourceAutoConfigurationTest.class)
-@SpringBootTest(properties = {
-        "spring.datasource.url=jdbc:dummy://localhost/test",
-        "spring.datasource.username=dbuser",
-        "spring.datasource.password=dbpass",
-        
"spring.datasource.driver-class-name=org.apache.camel.component.sql.support.DummyJDBCDriver"
-})
-public class DataSourceAutoConfigurationTest {
-
-    @Autowired
-    private DataSource datasource;
-
-    @Autowired
-    private CamelContext context;
-
-    @Test
-    public void testInjectionWorks() {
-        assertNotNull(datasource);
-    }
-
-    @Test
-    public void testSqlComponentUsesTheConfiguredDatasource() throws Exception 
{
-        SqlComponent component = (SqlComponent) context.getComponent("sql");
-        SqlEndpoint endpoint = (SqlEndpoint) 
component.createEndpoint("sql:select * from table where id=#");
-        assertEquals(datasource, endpoint.getJdbcTemplate().getDataSource());
-    }
-
-    @Test
-    public void testSqlStoredComponentUsesTheConfiguredDatasource() throws 
Exception {
-        SqlStoredComponent component = (SqlStoredComponent) 
context.getComponent("sql-stored");
-        SqlStoredEndpoint endpoint = (SqlStoredEndpoint) 
component.createEndpoint("sql:select * from table where id=#");
-        assertEquals(datasource, endpoint.getJdbcTemplate().getDataSource());
-    }
-
-}
diff --git 
a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlConsumerDynamicParameterTest.java
 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlConsumerDynamicParameterTest.java
new file mode 100644
index 0000000..d889e11
--- /dev/null
+++ 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlConsumerDynamicParameterTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import org.apache.camel.Configuration;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
+import org.springframework.test.annotation.DirtiesContext;
+
+import javax.sql.DataSource;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                SqlConsumerDynamicParameterTest.class,
+                SqlConsumerDynamicParameterTest.TestConfiguration.class,
+                BaseSql.TestConfiguration.class
+        }
+)
+public class SqlConsumerDynamicParameterTest extends BaseSql {
+
+    @EndpointInject("mock:result")
+    private MockEndpoint resultEndpoint;
+
+    @Test
+    public void testDynamicConsumer() throws Exception {
+        MyIdGenerator idGenerator = new MyIdGenerator();
+        context.getRegistry().bind("myIdGenerator", idGenerator);
+        resultEndpoint.expectedMessageCount(3);
+
+        context.getRouteController().startRoute("foo");
+
+        assertMockEndpointsSatisfied();
+
+        List<Exchange> exchanges = resultEndpoint.getReceivedExchanges();
+
+        assertEquals(1, exchanges.get(0).getIn().getBody(Map.class).get("ID"));
+        assertEquals("Camel", 
exchanges.get(0).getIn().getBody(Map.class).get("PROJECT"));
+        assertEquals(2, exchanges.get(1).getIn().getBody(Map.class).get("ID"));
+        assertEquals("AMQ", 
exchanges.get(1).getIn().getBody(Map.class).get("PROJECT"));
+        assertEquals(3, exchanges.get(2).getIn().getBody(Map.class).get("ID"));
+        assertEquals("Linux", 
exchanges.get(2).getIn().getBody(Map.class).get("PROJECT"));
+
+        // and the bean id should be > 1
+        assertTrue(idGenerator.getId() > 1, "Id counter should be > 1");
+    }
+
+    public static class MyIdGenerator {
+
+        private int id = 1;
+
+        public int nextId() {
+            return id++;
+        }
+
+        public int getId() {
+            return id;
+        }
+    }
+
+    // *************************************
+    // Config
+    // *************************************
+
+    @Configuration
+    public static class TestConfiguration {
+
+        @Bean
+        public DataSource dataSource() {
+            return initDb(EmbeddedDatabaseType.HSQL, 
"sql/createAndPopulateDatabase.sql");
+        }
+
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("sql:select * from projects where id = 
:#${bean:myIdGenerator.nextId}?initialDelay=0&delay=50")
+                            .routeId("foo").noAutoStartup()
+                            .to("mock:result");
+                }
+            };
+        }
+    }
+}
diff --git 
a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlEndpointMisconfigureDataSourceTest.java
 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlEndpointMisconfigureDataSourceTest.java
new file mode 100644
index 0000000..fc33dc4
--- /dev/null
+++ 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlEndpointMisconfigureDataSourceTest.java
@@ -0,0 +1,104 @@
+package org.apache.camel.component.sql;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.camel.FailedToCreateRouteException;
+import org.apache.camel.PropertyBindingException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import javax.sql.DataSource;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                SqlEndpointMisconfigureDataSourceTest.class,
+                SqlEndpointMisconfigureDataSourceTest.TestConfiguration.class,
+                BaseSql.TestConfiguration.class
+        }
+)
+public class SqlEndpointMisconfigureDataSourceTest extends BaseSql {
+
+
+    @Test
+    public void testFail() {
+        RouteBuilder rb = new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("direct:start")
+                        .to("sql:foo?dataSource=myDataSource")
+                        .to("mock:result");
+            }
+        };
+
+        FailedToCreateRouteException e = 
assertThrows(FailedToCreateRouteException.class, () -> context.addRoutes(rb),
+                "Should throw exception");
+
+        PropertyBindingException pbe = (PropertyBindingException) 
e.getCause().getCause();
+        assertEquals("dataSource", pbe.getPropertyName());
+        assertEquals("myDataSource", pbe.getValue());
+    }
+
+    @Test
+    public void testOk() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("direct:start")
+                        .to("sql:foo?dataSource=#myDataSource")
+                        .to("mock:result");
+            }
+        });
+        assertDoesNotThrow(() -> context.start());
+    }
+
+    // *************************************
+    // Config
+    // *************************************
+
+    @Configuration
+    public static class TestConfiguration {
+
+        @Bean(name = "myDataSource")
+        public DataSource dataSource() {
+            return initDb();
+        }
+
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("direct:query").to("sql:select max(id) from 
projects?outputType=SelectOne&outputHeader=MaxProjectID")
+                            .to("mock:query");
+                }
+            };
+        }
+    }
+
+}
diff --git 
a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlGeneratedKeysTest.java
 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlGeneratedKeysTest.java
new file mode 100644
index 0000000..855ce90
--- /dev/null
+++ 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlGeneratedKeysTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
+import org.springframework.test.annotation.DirtiesContext;
+
+import javax.sql.DataSource;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                SqlGeneratedKeysTest.class,
+                SqlGeneratedKeysTest.TestConfiguration.class,
+                BaseSql.TestConfiguration.class
+        }
+)
+public class SqlGeneratedKeysTest extends BaseSql {
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testRetrieveGeneratedKey() {
+        // first we create our exchange using the endpoint
+        Endpoint endpoint = context.getEndpoint("direct:insert");
+
+        Object body = new Object[] { "project x", "ASF", "new project" };
+        Exchange exchange = endpoint.createExchange();
+        exchange.getIn().setBody(body);
+        exchange.getIn().setHeader(SqlConstants.SQL_RETRIEVE_GENERATED_KEYS, 
true);
+
+        // now we send the exchange to the endpoint, and receives the response 
from Camel
+        Exchange out = template.send(endpoint, exchange);
+
+        // assertions of the response
+        assertNotNull(out);
+        assertNotNull(out.getMessage());
+        
assertNotNull(out.getMessage().getHeader(SqlConstants.SQL_GENERATED_KEYS_DATA));
+        assertSame(body, out.getMessage().getBody());
+
+        List<Map<String, Object>> generatedKeys = 
out.getMessage().getHeader(SqlConstants.SQL_GENERATED_KEYS_DATA, List.class);
+        assertNotNull(generatedKeys, "out body could not be converted to a 
List - was: "
+                                     + out.getMessage().getBody());
+        assertEquals(1, generatedKeys.get(0).size());
+
+        Map<String, Object> row = generatedKeys.get(0);
+        assertEquals(3, row.get("ID"), "auto increment value should be 3");
+
+        assertEquals(1, 
out.getMessage().getHeader(SqlConstants.SQL_GENERATED_KEYS_ROW_COUNT),
+                "generated keys row count should be one");
+    }
+
+    // *************************************
+    // Config
+    // *************************************
+
+    @Configuration
+    public static class TestConfiguration {
+
+        @Bean
+        public DataSource dataSource() {
+            return initDb(EmbeddedDatabaseType.HSQL, 
"sql/createAndPopulateDatabase3.sql");
+        }
+
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("direct:insert").to("sql:insert into projects 
(project, license, description) values (#, #, #)");
+                }
+            };
+        }
+    }
+}
diff --git 
a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerExpressionParameterTest.java
 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerExpressionParameterTest.java
new file mode 100644
index 0000000..02a99ba
--- /dev/null
+++ 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerExpressionParameterTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import org.apache.camel.Configuration;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
+import org.springframework.test.annotation.DirtiesContext;
+
+import javax.sql.DataSource;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.camel.test.junit5.TestSupport.assertIsInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                SqlProducerExpressionParameterTest.class,
+                SqlProducerExpressionParameterTest.TestConfiguration.class,
+                BaseSql.TestConfiguration.class
+        }
+)
+public class SqlProducerExpressionParameterTest extends BaseSql {
+
+    @EndpointInject("mock:result")
+    MockEndpoint result;
+
+    @Test
+    public void testNamedParameterFromExpression() throws Exception {
+        result.expectedMessageCount(1);
+
+        template.sendBodyAndProperty("direct:start", "This is a dummy body", 
"license", "ASF");
+
+        result.assertIsSatisfied();
+
+        List<?> received = assertIsInstanceOf(List.class, 
result.getReceivedExchanges().get(0).getIn().getBody());
+        assertEquals(2, received.size());
+        Map<?, ?> row = assertIsInstanceOf(Map.class, received.get(0));
+        assertEquals("Camel", row.get("PROJECT"));
+        row = assertIsInstanceOf(Map.class, received.get(1));
+        assertEquals("AMQ", row.get("PROJECT"));
+    }
+
+
+    // *************************************
+    // Config
+    // *************************************
+
+    @Configuration
+    public static class TestConfiguration {
+
+        @Bean
+        public DataSource dataSource() {
+            return initDb(EmbeddedDatabaseType.HSQL, 
"sql/createAndPopulateDatabase3.sql");
+        }
+
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("direct:start").to("sql:select * from projects where 
license = :#${exchangeProperty.license} order by id")
+                            .to("mock:result");
+                }
+            };
+        }
+    }
+}
diff --git 
a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerInTest.java
 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerInTest.java
new file mode 100644
index 0000000..4738535
--- /dev/null
+++ 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerInTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import org.apache.camel.Configuration;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
+import org.springframework.test.annotation.DirtiesContext;
+
+import javax.sql.DataSource;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                SqlProducerInTest.class,
+                SqlProducerInTest.TestConfiguration.class,
+                BaseSql.TestConfiguration.class
+        }
+)
+public class SqlProducerInTest extends BaseSql {
+
+    @EndpointInject("mock:query")
+    private MockEndpoint queryEndpoint;
+
+    @Test
+    public void testQueryInString() throws InterruptedException {
+        queryEndpoint.expectedMessageCount(1);
+
+        template.requestBodyAndHeader("direct:query", "Hi there!", "names", 
"Camel,AMQ");
+
+        assertMockEndpointsSatisfied();
+
+        List list = 
queryEndpoint.getReceivedExchanges().get(0).getIn().getBody(List.class);
+        assertEquals(2, list.size());
+        Map row = (Map) list.get(0);
+        assertEquals("Camel", row.get("PROJECT"));
+        row = (Map) list.get(1);
+        assertEquals("AMQ", row.get("PROJECT"));
+    }
+
+    // *************************************
+    // Config
+    // *************************************
+
+    @Configuration
+    public static class TestConfiguration {
+
+        @Bean
+        public DataSource dataSource() {
+            return initDb(EmbeddedDatabaseType.HSQL, 
"sql/createAndPopulateDatabase.sql");
+        }
+
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("direct:query")
+                            .to("sql:classpath:sql/selectProjectsIn.sql")
+                            .to("log:query")
+                            .to("mock:query");
+                }
+            };
+        }
+    }
+}
diff --git 
a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerOutputHeaderTest.java
 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerOutputHeaderTest.java
new file mode 100644
index 0000000..42719eb
--- /dev/null
+++ 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerOutputHeaderTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import javax.sql.DataSource;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                SqlProducerOutputHeaderTest.class,
+                SqlProducerOutputHeaderTest.TestConfiguration.class,
+                BaseSql.TestConfiguration.class
+        }
+)
+public class SqlProducerOutputHeaderTest extends BaseSql {
+
+    @EndpointInject("mock:query")
+    private MockEndpoint queryEndpoint;
+
+    @Test
+    public void testQueryOutputHeader() throws InterruptedException {
+        queryEndpoint.expectedMessageCount(1);
+        queryEndpoint.expectedHeaderReceived(SqlConstants.SQL_ROW_COUNT, 1);
+        queryEndpoint.expectedHeaderReceived("MaxProjectID", 3);
+        queryEndpoint.message(0).body().isEqualTo("Hi there!");
+
+        template.requestBody("direct:query", "Hi there!");
+
+        assertMockEndpointsSatisfied();
+    }
+
+
+    // *************************************
+    // Config
+    // *************************************
+
+    @Configuration
+    public static class TestConfiguration {
+
+        @Bean
+        public DataSource dataSource() {
+            return initDb();
+        }
+
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("direct:query").to("sql:select max(id) from 
projects?outputType=SelectOne&outputHeader=MaxProjectID")
+                            .to("mock:query");
+                }
+            };
+        }
+    }
+}
diff --git 
a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerOutputTypeStreamListTest.java
 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerOutputTypeStreamListTest.java
new file mode 100644
index 0000000..4d1b0c7
--- /dev/null
+++ 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerOutputTypeStreamListTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import javax.sql.DataSource;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                SqlProducerOutputTypeStreamListTest.class,
+                SqlProducerOutputTypeStreamListTest.TestConfiguration.class,
+                BaseSql.TestConfiguration.class
+        }
+)
+public class SqlProducerOutputTypeStreamListTest extends BaseSql {
+
+
+    @EndpointInject("mock:result")
+    private MockEndpoint resultEndpoint;
+
+    @Test
+    public void testSplit() throws Exception {
+        resultEndpoint.expectedMessageCount(3);
+
+        template.sendBody("direct:withSplit", "testmsg");
+
+        assertMockEndpointsSatisfied();
+        assertThat(resultBodyAt(resultEndpoint, 0), instanceOf(Map.class));
+        assertThat(resultBodyAt(resultEndpoint, 1), instanceOf(Map.class));
+        assertThat(resultBodyAt(resultEndpoint, 2), instanceOf(Map.class));
+    }
+
+    private Object resultBodyAt(MockEndpoint result, int index) {
+        return result.assertExchangeReceived(index).getIn().getBody();
+    }
+
+    // *************************************
+    // Config
+    // *************************************
+
+    @Configuration
+    public static class TestConfiguration {
+
+        @Bean
+        public DataSource dataSource() {
+            return initDb();
+        }
+
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("direct:withSplit")
+                            .to("sql:select * from projects order by 
id?outputType=StreamList")
+                            .to("log:stream")
+                            .split(body()).streaming()
+                            .to("log:row")
+                            .to("mock:result")
+                            .end();
+                }
+            };
+        }
+    }
+}
diff --git 
a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerToDTest.java
 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerToDTest.java
new file mode 100644
index 0000000..199e029
--- /dev/null
+++ 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerToDTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import javax.sql.DataSource;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                SqlProducerToDTest.class,
+                SqlProducerToDTest.TestConfiguration.class,
+                BaseSql.TestConfiguration.class
+        }
+)
+public class SqlProducerToDTest extends BaseSql {
+
+    @EndpointInject("mock:query")
+    private MockEndpoint queryEndpoint;
+
+    @Test
+    public void testToD() throws InterruptedException {
+        queryEndpoint.expectedMessageCount(1);
+
+        template.requestBodyAndHeader("direct:query", "Hi there!", "foo", 
"AMQ");
+
+        assertMockEndpointsSatisfied();
+
+        List list = 
queryEndpoint.getReceivedExchanges().get(0).getIn().getBody(List.class);
+        assertEquals(1, list.size());
+        Map row = (Map) list.get(0);
+        assertEquals("AMQ", row.get("PROJECT"));
+    }
+
+    // *************************************
+    // Config
+    // *************************************
+
+    @Configuration
+    public static class TestConfiguration {
+
+        @Bean(name = "myDS")
+        public DataSource dataSource() {
+            return initDb();
+        }
+
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("direct:query")
+                            .setHeader("myQuery", constant("select * from 
projects where project = :#foo order by id"))
+                            .toD("sql:${header.myQuery}?dataSource=#myDS")
+                            .to("log:query")
+                            .to("mock:query");
+                }
+            };
+        }
+    }
+
+}
diff --git 
a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerUpdateHeadersTest.java
 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerUpdateHeadersTest.java
new file mode 100644
index 0000000..ebb0f7a
--- /dev/null
+++ 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerUpdateHeadersTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import javax.sql.DataSource;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                SqlProducerUpdateHeadersTest.class,
+                SqlProducerUpdateHeadersTest.TestConfiguration.class,
+                BaseSql.TestConfiguration.class
+        }
+)
+public class SqlProducerUpdateHeadersTest extends BaseSql {
+
+    @EndpointInject("mock:update")
+    private MockEndpoint updateEndpoint;
+
+    @Test
+    public void testUpdateNoop() throws InterruptedException {
+        updateEndpoint.expectedMessageCount(1);
+        updateEndpoint.expectedHeaderReceived(SqlConstants.SQL_UPDATE_COUNT, 
1);
+        updateEndpoint.message(0).body().isEqualTo("Hi there!");
+
+        template.requestBody("direct:update", "Hi there!");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    // *************************************
+    // Config
+    // *************************************
+
+    @Configuration
+    public static class TestConfiguration {
+
+        @Bean
+        public DataSource dataSource() {
+            return initDb();
+        }
+
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("direct:update").to("sql:update projects set 
license='MIT' where id=3").to("mock:update");
+                }
+            };
+        }
+    }
+}
diff --git 
a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerUseMessageBodyForSqlTest.java
 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerUseMessageBodyForSqlTest.java
new file mode 100644
index 0000000..449a2d9
--- /dev/null
+++ 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerUseMessageBodyForSqlTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import javax.sql.DataSource;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.camel.test.junit5.TestSupport.assertIsInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                SqlProducerUseMessageBodyForSqlTest.class,
+                SqlProducerUseMessageBodyForSqlTest.TestConfiguration.class,
+                BaseSql.TestConfiguration.class
+        }
+)
+public class SqlProducerUseMessageBodyForSqlTest extends BaseSql {
+
+    @EndpointInject("mock:result")
+    private MockEndpoint resultEndpoint;
+
+    @EndpointInject("mock:resultInsert")
+    private MockEndpoint resultInsertEndpoint;
+
+    @BeforeEach
+    public void resetMock() throws Exception {
+        resultEndpoint.reset();
+        resultInsertEndpoint.reset();
+    }
+
+    @Test
+    public void testUseMessageBodyForSqlAndHeaderParams() throws Exception {
+        resultEndpoint.reset();
+        resultEndpoint.expectedMessageCount(1);
+
+        template.sendBodyAndHeader("direct:start", null, "lic", "ASF");
+
+        List<?> received = assertIsInstanceOf(List.class, 
resultEndpoint.getReceivedExchanges().get(0).getIn().getBody());
+        assertEquals(2, received.size());
+        Map<?, ?> row = assertIsInstanceOf(Map.class, received.get(0));
+        assertEquals("Camel", row.get("PROJECT"));
+
+        row = assertIsInstanceOf(Map.class, received.get(1));
+        assertEquals("AMQ", row.get("PROJECT"));
+    }
+
+
+    @Test
+    @SuppressWarnings({ "unchecked", "deprecated" })
+    public void testUseMessageBodyForSqlAndCamelSqlParametersBatch() throws 
Exception {
+
+        resultInsertEndpoint.expectedMessageCount(1);
+
+        List<Map<String, Object>> rows = new ArrayList<>();
+        Map<String, Object> row = new HashMap<>();
+        row.put("id", 200);
+        row.put("project", "MyProject1");
+        row.put("lic", "OPEN1");
+        rows.add(row);
+        row = new HashMap<>();
+        row.put("id", 201);
+        row.put("project", "MyProject2");
+        row.put("lic", "OPEN1");
+        rows.add(row);
+        template.sendBodyAndHeader("direct:insert", null, 
SqlConstants.SQL_PARAMETERS, rows);
+
+        String origSql = assertIsInstanceOf(String.class, 
resultInsertEndpoint.getReceivedExchanges().get(0).getIn().getBody());
+        assertEquals("insert into projects(id, project, license) 
values(:?id,:?project,:?lic)", origSql);
+
+        assertEquals(null, 
resultInsertEndpoint.getReceivedExchanges().get(0).getOut().getBody());
+
+        // Clear and then use route2 to verify result of above insert select
+        context.removeRoute(context.getRoutes().get(0).getId());
+
+        resultEndpoint.reset();
+        resultEndpoint.expectedMessageCount(1);
+
+        template.sendBodyAndHeader("direct:start", null, "lic", "OPEN1");
+
+        List<?> received = assertIsInstanceOf(List.class, 
resultEndpoint.getReceivedExchanges().get(0).getIn().getBody());
+        assertEquals(2, received.size());
+        row = assertIsInstanceOf(Map.class, received.get(0));
+        assertEquals("MyProject1", row.get("PROJECT"));
+
+        row = assertIsInstanceOf(Map.class, received.get(1));
+        assertEquals("MyProject2", row.get("PROJECT"));
+    }
+
+    // *************************************
+    // Config
+    // *************************************
+
+    @Configuration
+    public static class TestConfiguration {
+
+        @Bean
+        public DataSource dataSource() {
+            return initDb();
+        }
+
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("direct:start")
+                            .setBody(constant("select * from projects where 
license = :?lic order by id"))
+                            .to("sql://query?useMessageBodyForSql=true")
+                            .to("mock:result");
+
+
+                    from("direct:insert").routeId("baz")
+                            .setBody(constant("insert into projects(id, 
project, license) values(:?id,:?project,:?lic)"))
+                            
.to("sql://query?useMessageBodyForSql=true&batch=true")
+                            .to("mock:resultInsert");
+                }
+            };
+        }
+    }
+}
diff --git 
a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlTransactedRouteTest.java
 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlTransactedRouteTest.java
new file mode 100644
index 0000000..8bb911a
--- /dev/null
+++ 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlTransactedRouteTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import org.apache.camel.Configuration;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.Registry;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.spring.spi.SpringTransactionPolicy;
+import org.apache.camel.support.SimpleRegistry;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.datasource.DataSourceTransactionManager;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.transaction.PlatformTransactionManager;
+
+import javax.sql.DataSource;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                SqlTransactedRouteTest.class,
+                SqlTransactedRouteTest.TestConfiguration.class,
+                BaseSql.TestConfiguration.class
+        }
+)
+public class SqlTransactedRouteTest extends BaseSql {
+
+    private JdbcTemplate jdbc;
+
+    private static String startEndpoint = "direct:start";
+    private static String sqlEndpoint = 
"sql:overriddenByTheHeader?dataSource=#testdb";
+
+    @Autowired
+    private DataSource ds;
+
+    @Autowired
+    private PlatformTransactionManager txMgr;
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        jdbc = new JdbcTemplate(ds);
+        jdbc.execute("CREATE TABLE CUSTOMER (ID VARCHAR(15) NOT NULL PRIMARY 
KEY, NAME VARCHAR(100))");
+    }
+
+    @Test
+    public void testCommit() throws Exception {
+        Exchange exchange = template.send(startEndpoint, e ->
+            e.getIn().setHeader(SqlConstants.SQL_QUERY, "insert into customer 
values('cust1','cmueller')")
+
+        );
+
+        assertFalse(exchange.isFailed());
+
+        long count = jdbc.queryForObject("select count(*) from customer", 
Long.class);
+        assertEquals(2, count);
+
+        Map<String, Object> map = jdbc.queryForMap("select * from customer 
where id = 'cust1'");
+        assertEquals(2, map.size());
+        assertEquals("cust1", map.get("ID"));
+        assertEquals("cmueller", map.get("NAME"));
+
+        map = jdbc.queryForMap("select * from customer where id = 'cust2'");
+        assertEquals(2, map.size());
+        assertEquals("cust2", map.get("ID"));
+        assertEquals("muellerc", map.get("NAME"));
+    }
+
+    @Test
+    public void testRollbackAfterAnException() throws Exception {
+        Exchange exchange = template.send("direct:start2", new Processor() {
+            @Override
+            public void process(Exchange exchange) {
+                exchange.getIn().setHeader(SqlConstants.SQL_QUERY, "insert 
into customer values('cust1','cmueller')");
+            }
+        });
+
+        assertTrue(exchange.isFailed());
+
+        long count = jdbc.queryForObject("select count(*) from customer", 
Long.class);
+        assertEquals(0, count);
+    }
+
+    // *************************************
+    // Config
+    // *************************************
+
+    @Configuration
+    public static class TestConfiguration {
+
+        @Bean("testdb")
+        public DataSource dataSource() {
+            return initEmptyDb();
+        }
+
+        @Bean(name = "required")
+        public SpringTransactionPolicy 
transactionManager(PlatformTransactionManager platformTransactionManager) {
+            SpringTransactionPolicy txPolicy = new SpringTransactionPolicy();
+            txPolicy.setTransactionManager(platformTransactionManager);
+            txPolicy.setPropagationBehaviorName("PROPAGATION_REQUIRED");
+            return txPolicy;
+        }
+
+        @Bean(name = "txManager")
+        public PlatformTransactionManager transactionManager(DataSource 
dataSource) {
+            return new DataSourceTransactionManager(dataSource);
+        }
+
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("direct:start").routeId("commit")
+                            .transacted("required")
+                            .to(sqlEndpoint)
+                            .process(e -> 
e.getIn().setHeader(SqlConstants.SQL_QUERY,
+                                            "insert into customer 
values('cust2','muellerc')")
+                            )
+                            .to(sqlEndpoint);
+
+                    from("direct:start2").routeId("rollback2")
+                            .transacted("required")
+                            .to(sqlEndpoint)
+                            .process(new Processor() {
+                                @Override
+                                public void process(Exchange exchange) throws 
Exception {
+                                    throw new Exception("forced Exception");
+                                }
+                            });
+                }
+            };
+        }
+    }
+}
diff --git 
a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/aggregation/HeaderDto.java
 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/aggregation/HeaderDto.java
new file mode 100644
index 0000000..f58785e
--- /dev/null
+++ 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/aggregation/HeaderDto.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql.aggregation;
+
+import java.io.Serializable;
+
+public class HeaderDto implements Cloneable, Serializable {
+    private String org;
+    private String type;
+    private int key;
+
+    public HeaderDto(String org, String type, int key) {
+        this.org = org;
+        this.type = type;
+        this.key = key;
+    }
+
+    public int getKey() {
+        return key;
+    }
+
+    public void setKey(int key) {
+        this.key = key;
+    }
+
+    public String getOrg() {
+        return org;
+    }
+
+    public void setOrg(String org) {
+        this.org = org;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    @Override
+    public String toString() {
+        return "HeaderDto [org=" + org + ", type=" + type + ", key=" + key + 
"]";
+    }
+}
diff --git 
a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/aggregation/JdbcAggregateRecoverDeadLetterChannelTest.java
 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/aggregation/JdbcAggregateRecoverDeadLetterChannelTest.java
new file mode 100644
index 0000000..bf2c62e
--- /dev/null
+++ 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/aggregation/JdbcAggregateRecoverDeadLetterChannelTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql.aggregation;
+
+import org.apache.camel.Configuration;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sql.BaseSql;
+import org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.jdbc.datasource.DataSourceTransactionManager;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.transaction.PlatformTransactionManager;
+
+import javax.sql.DataSource;
+import java.util.concurrent.TimeUnit;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                JdbcAggregateRecoverDeadLetterChannelTest.class,
+                
JdbcAggregateRecoverDeadLetterChannelTest.TestConfiguration.class,
+                BaseSql.TestConfiguration.class
+        }
+)
+public class JdbcAggregateRecoverDeadLetterChannelTest extends BaseSql {
+
+    @EndpointInject("mock:result")
+    protected MockEndpoint resultEndpoint;
+
+    @EndpointInject("mock:aggregated")
+    protected MockEndpoint aggregatedEndpoint;
+
+    @EndpointInject("mock:dead")
+    protected MockEndpoint deadEndpoint;
+
+    @Test
+    public void testJdbcAggregateRecoverDeadLetterChannel() throws Exception {
+        // should fail all times
+        resultEndpoint.expectedMessageCount(0);
+        aggregatedEndpoint.expectedMessageCount(4);
+        deadEndpoint.expectedBodiesReceived("ABCDE");
+        
deadEndpoint.message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+        
deadEndpoint.message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(3);
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+        template.sendBodyAndHeader("direct:start", "C", "id", 123);
+        template.sendBodyAndHeader("direct:start", "D", "id", 123);
+        template.sendBodyAndHeader("direct:start", "E", "id", 123);
+
+        assertMockEndpointsSatisfied();
+    }
+
+
+    // *************************************
+    // Config
+    // *************************************
+
+    @Configuration
+    public static class TestConfiguration {
+
+        @Bean
+        public PlatformTransactionManager transactionManager(DataSource 
dataSource) {
+            return new DataSourceTransactionManager(dataSource);
+        }
+
+        @Bean
+        public DataSource dataSource() {
+            return initDb("sql/init.sql");
+        }
+
+        @Bean
+        public JdbcAggregationRepository jdbcAggregationRepository(DataSource 
dataSource, PlatformTransactionManager transactionManager) {
+            JdbcAggregationRepository repo = new 
JdbcAggregationRepository(transactionManager, "aggregationRepo1", dataSource);
+
+            // enable recovery
+            repo.setUseRecovery(true);
+            // exhaust after at most 3 attempts
+            repo.setMaximumRedeliveries(3);
+            // and move to this dead letter channel
+            repo.setDeadLetterUri("mock:dead");
+            // check faster
+            repo.setRecoveryInterval(500, TimeUnit.MILLISECONDS);
+
+            return repo;
+        }
+
+        @Bean
+        public RouteBuilder routeBuilder(JdbcAggregationRepository repo) {
+            return new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    deadLetterChannel("mock:error");
+
+                    from("direct:start")
+                            .aggregate(header("id"), new 
MyAggregationStrategy())
+                            .completionSize(5).aggregationRepository(repo)
+                            .log("aggregated exchange id ${exchangeId} with 
${body}")
+                            .to("mock:aggregated")
+                            .throwException(new 
IllegalArgumentException("Damn"))
+                            .to("mock:result")
+                            .end();
+                }
+            };
+        }
+    }
+}
diff --git 
a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/aggregation/JdbcAggregateSerializedHeadersTest.java
 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/aggregation/JdbcAggregateSerializedHeadersTest.java
new file mode 100644
index 0000000..b17c539
--- /dev/null
+++ 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/aggregation/JdbcAggregateSerializedHeadersTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql.aggregation;
+
+import org.apache.camel.Configuration;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sql.BaseSql;
+import org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.jdbc.datasource.DataSourceTransactionManager;
+import org.springframework.jdbc.support.lob.AbstractLobHandler;
+import org.springframework.jdbc.support.lob.DefaultLobHandler;
+import org.springframework.jdbc.support.lob.LobCreator;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.transaction.PlatformTransactionManager;
+
+import javax.sql.DataSource;
+import java.io.InputStream;
+import java.io.Reader;
+import java.nio.charset.StandardCharsets;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.concurrent.TimeUnit;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                JdbcAggregateSerializedHeadersTest.class,
+                JdbcAggregateSerializedHeadersTest.TestConfiguration.class,
+                BaseSql.TestConfiguration.class
+        }
+)
+public class JdbcAggregateSerializedHeadersTest extends BaseSql {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcAggregateSerializedHeadersTest.class);
+    private static final int SIZE = 500;
+
+    @EndpointInject("mock:result")
+    protected MockEndpoint resultEndpoint;
+
+    @Test
+    public void testLoadTestJdbcAggregate() throws Exception {
+        resultEndpoint.expectedMinimumMessageCount(1);
+        resultEndpoint.setResultWaitTime(50 * 1000);
+
+        LOG.info("Staring to send " + SIZE + " messages.");
+
+        for (int i = 0; i < SIZE; i++) {
+            final int value = 1;
+            HeaderDto headerDto = new HeaderDto("org", "company", 1);
+            LOG.debug("Sending {} with id {}", value, headerDto);
+            template.sendBodyAndHeader("seda:start?size=" + SIZE, value, "id", 
headerDto);
+        }
+
+        LOG.info("Sending all " + SIZE + " message done. Now waiting for 
aggregation to complete.");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    // *************************************
+    // Config
+    // *************************************
+
+    @Configuration
+    public static class TestConfiguration {
+
+        @Bean
+        public PlatformTransactionManager transactionManager(DataSource 
dataSource) {
+            return new DataSourceTransactionManager(dataSource);
+        }
+
+        @Bean
+        public DataSource dataSource() {
+            return initDb("sql/init.sql");
+        }
+
+        @Bean
+        public JdbcAggregationRepository jdbcAggregationRepository(DataSource 
dataSource, PlatformTransactionManager transactionManager) {
+            JdbcAggregationRepository repo = new 
JdbcAggregationRepository(transactionManager, "aggregationRepo1", dataSource);
+
+            repo.setAllowSerializedHeaders(true);
+
+            return repo;
+        }
+
+        @Bean
+        public RouteBuilder routeBuilder(JdbcAggregationRepository repo) {
+            return new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("seda:start?size=" + SIZE)
+                            .to("log:input?groupSize=500")
+                            .aggregate(header("id"), new 
MyAggregationStrategy())
+                            .aggregationRepository(repo)
+                            .completionSize(SIZE)
+                            .to("log:output?showHeaders=true")
+                            .to("mock:result")
+                            .end();
+                }
+            };
+        }
+    }
+}
diff --git 
a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/aggregation/JdbcAggregateStoreAsTextTest.java
 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/aggregation/JdbcAggregateStoreAsTextTest.java
new file mode 100644
index 0000000..9b5667a
--- /dev/null
+++ 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/aggregation/JdbcAggregateStoreAsTextTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql.aggregation;
+
+import org.apache.camel.Configuration;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sql.BaseSql;
+import org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.datasource.DataSourceTransactionManager;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.transaction.PlatformTransactionManager;
+
+import javax.sql.DataSource;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                JdbcAggregateStoreAsTextTest.class,
+                JdbcAggregateStoreAsTextTest.TestConfiguration.class,
+                BaseSql.TestConfiguration.class
+        }
+)
+public class JdbcAggregateStoreAsTextTest extends BaseSql {
+    protected JdbcTemplate jdbcTemplate;
+
+    @EndpointInject("mock:result")
+    protected MockEndpoint resultEndpoint;
+
+    @Autowired
+    private DataSource ds;
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        jdbcTemplate = new JdbcTemplate(ds);
+        jdbcTemplate.afterPropertiesSet();
+    }
+
+    @Test
+    public void testStoreBodyAsTextAndCompanyNameHeaderAndAccountNameHeader() 
throws Exception {
+        resultEndpoint.expectedBodiesReceived("ABCDE");
+
+
+
+        Map<String, Object> headers = new HashMap<>();
+        headers.put("id", 123);
+        headers.put("companyName", "Acme");
+        headers.put("accountName", "Alan");
+
+        template.sendBodyAndHeaders("direct:start", "A", headers);
+        assertEquals("A", getAggregationRepositoryBody(123));
+        assertEquals("Acme", getAggregationRepositoryCompanyName(123));
+        assertEquals("Alan", getAggregationRepositoryAccountName(123));
+
+        template.sendBodyAndHeaders("direct:start", "B", headers);
+        assertEquals("AB", getAggregationRepositoryBody(123));
+        assertEquals("Acme", getAggregationRepositoryCompanyName(123));
+        assertEquals("Alan", getAggregationRepositoryAccountName(123));
+
+        template.sendBodyAndHeaders("direct:start", "C", headers);
+        assertEquals("ABC", getAggregationRepositoryBody(123));
+        assertEquals("Acme", getAggregationRepositoryCompanyName(123));
+        assertEquals("Alan", getAggregationRepositoryAccountName(123));
+
+        template.sendBodyAndHeaders("direct:start", "D", headers);
+        assertEquals("ABCD", getAggregationRepositoryBody(123));
+        assertEquals("Acme", getAggregationRepositoryCompanyName(123));
+        assertEquals("Alan", getAggregationRepositoryAccountName(123));
+
+        template.sendBodyAndHeaders("direct:start", "E", headers);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public String getAggregationRepositoryBody(int id) {
+        return getAggregationRepositoryColumn(id, "body");
+    }
+
+    public String getAggregationRepositoryCompanyName(int id) {
+        return getAggregationRepositoryColumn(id, "companyName");
+    }
+
+    public String getAggregationRepositoryAccountName(int id) {
+        return getAggregationRepositoryColumn(id, "accountName");
+    }
+
+    public String getAggregationRepositoryColumn(int id, String columnName) {
+        return jdbcTemplate.queryForObject("SELECT " + columnName + " from 
aggregationRepo3 where id = ?", String.class, id);
+    }
+
+    @Configuration
+    public static class TestConfiguration {
+
+        @Bean
+        public PlatformTransactionManager transactionManager(DataSource 
dataSource) {
+            return new DataSourceTransactionManager(dataSource);
+        }
+
+        @Bean
+        public DataSource dataSource() {
+            return initDb("sql/init3.sql");
+        }
+
+        @Bean
+        public JdbcAggregationRepository jdbcAggregationRepository(DataSource 
dataSource, PlatformTransactionManager transactionManager) {
+            JdbcAggregationRepository repo = new 
JdbcAggregationRepository(transactionManager, "aggregationRepo3", dataSource);
+
+            repo.setStoreBodyAsText(true);
+            repo.setHeadersToStoreAsText(Arrays.asList("companyName", 
"accountName"));
+
+            return repo;
+        }
+
+        @Bean
+        public RouteBuilder routeBuilder(JdbcAggregationRepository repo) {
+            return new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("direct:start")
+                            .aggregate(header("id"), new 
MyAggregationStrategy())
+                            .aggregationRepository(repo)
+                            .completionSize(5)
+                            .to("log:output?showHeaders=true")
+                            .to("mock:result")
+                            .end();
+                }
+            };
+        }
+    }
+}
diff --git 
a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/aggregation/MyAggregationStrategy.java
 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/aggregation/MyAggregationStrategy.java
new file mode 100644
index 0000000..9740f54
--- /dev/null
+++ 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/aggregation/MyAggregationStrategy.java
@@ -0,0 +1,19 @@
+package org.apache.camel.component.sql.aggregation;
+
+import org.apache.camel.AggregationStrategy;
+import org.apache.camel.Exchange;
+
+public class MyAggregationStrategy implements AggregationStrategy {
+
+    @Override
+    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        if (oldExchange == null) {
+            return newExchange;
+        }
+        String body1 = oldExchange.getIn().getBody(String.class);
+        String body2 = newExchange.getIn().getBody(String.class);
+
+        oldExchange.getIn().setBody(body1 + body2);
+        return oldExchange;
+    }
+}
diff --git 
a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/idempotent/CustomizedJdbcMessageIdRepositoryTest.java
 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/idempotent/CustomizedJdbcMessageIdRepositoryTest.java
new file mode 100644
index 0000000..457a80c
--- /dev/null
+++ 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/idempotent/CustomizedJdbcMessageIdRepositoryTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql.idempotent;
+
+import org.apache.camel.Configuration;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sql.BaseSql;
+import org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.test.annotation.DirtiesContext;
+
+import javax.sql.DataSource;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                CustomizedJdbcMessageIdRepositoryTest.class,
+                CustomizedJdbcMessageIdRepositoryTest.TestConfiguration.class,
+                BaseSql.TestConfiguration.class
+        }
+)
+public class CustomizedJdbcMessageIdRepositoryTest extends BaseSql {
+
+    protected static final String SELECT_ALL_STRING
+            = "SELECT messageId FROM CUSTOMIZED_MESSAGE_REPOSITORY WHERE 
processorName = ?";
+    protected static final String PROCESSOR_NAME = "myProcessorName";
+
+    protected JdbcTemplate jdbcTemplate;
+
+    @EndpointInject("mock:result")
+    protected MockEndpoint resultEndpoint;
+
+    @EndpointInject("mock:error")
+    protected MockEndpoint errorEndpoint;
+
+    @Autowired
+    private DataSource dataSource;
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        jdbcTemplate = new JdbcTemplate(dataSource);
+        jdbcTemplate.afterPropertiesSet();
+    }
+
+    @Test
+    public void testDuplicateMessagesAreFilteredOut() throws Exception {
+        resultEndpoint.expectedBodiesReceived("one", "two", "three");
+        errorEndpoint.expectedMessageCount(0);
+
+        template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
+        template.sendBodyAndHeader("direct:start", "two", "messageId", "2");
+        template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
+        template.sendBodyAndHeader("direct:start", "two", "messageId", "2");
+        template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
+        template.sendBodyAndHeader("direct:start", "three", "messageId", "3");
+
+        assertMockEndpointsSatisfied();
+
+        // all 3 messages should be in jdbc repo
+        List<String> receivedMessageIds = 
jdbcTemplate.queryForList(SELECT_ALL_STRING, String.class, PROCESSOR_NAME);
+
+        assertEquals(3, receivedMessageIds.size());
+        assertTrue(receivedMessageIds.contains("1"));
+        assertTrue(receivedMessageIds.contains("2"));
+        assertTrue(receivedMessageIds.contains("3"));
+    }
+
+    // *************************************
+    // Config
+    // *************************************
+
+    @Configuration
+    public static class TestConfiguration {
+
+        @Bean
+        public DataSource dataSource() {
+            return initEmptyDb();
+        }
+
+
+        @Bean
+        public RouteBuilder routeBuilder(DataSource dataSource) {
+            return new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    deadLetterChannel("mock:error");
+
+                    JdbcMessageIdRepository repo = new 
JdbcMessageIdRepository(dataSource, PROCESSOR_NAME);
+                    repo.setTableExistsString("SELECT 1 FROM 
CUSTOMIZED_MESSAGE_REPOSITORY WHERE 1 = 0");
+                    repo.setCreateString("CREATE TABLE 
CUSTOMIZED_MESSAGE_REPOSITORY (processorName VARCHAR(255), messageId 
VARCHAR(100), createdAt TIMESTAMP)");
+                    repo.setQueryString("SELECT COUNT(*) FROM 
CUSTOMIZED_MESSAGE_REPOSITORY WHERE processorName = ? AND messageId = ?");
+                    repo.setInsertString("INSERT INTO 
CUSTOMIZED_MESSAGE_REPOSITORY (processorName, messageId, createdAt) VALUES (?, 
?, ?)");
+                    repo.setDeleteString("DELETE FROM 
CUSTOMIZED_MESSAGE_REPOSITORY WHERE processorName = ? AND messageId = ?");
+
+                    from("direct:start")
+                            .idempotentConsumer(header("messageId"), repo)
+                            .to("mock:result");
+                }
+            };
+        }
+    }
+}
diff --git 
a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/idempotent/JdbcCachedMessageIdRepositoryTest.java
 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/idempotent/JdbcCachedMessageIdRepositoryTest.java
new file mode 100644
index 0000000..48638d4
--- /dev/null
+++ 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/idempotent/JdbcCachedMessageIdRepositoryTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql.idempotent;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Configuration;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sql.BaseSql;
+import 
org.apache.camel.processor.idempotent.jdbc.JdbcCachedMessageIdRepository;
+import org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.test.annotation.DirtiesContext;
+
+import javax.sql.DataSource;
+import java.sql.Timestamp;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                JdbcCachedMessageIdRepositoryTest.class,
+                JdbcCachedMessageIdRepositoryTest.TestConfiguration.class,
+                BaseSql.TestConfiguration.class
+        }
+)
+public class JdbcCachedMessageIdRepositoryTest extends BaseSql {
+
+    private static final String INSERT_STRING
+            = "INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId, 
createdAt) VALUES (?, ?, ?)";
+    private static final String PROCESSOR_NAME = "myProcessorName";
+
+    private JdbcTemplate jdbcTemplate;
+    private DataSource dataSource;
+    private JdbcCachedMessageIdRepository repository;
+
+    @EndpointInject("mock:result")
+    private MockEndpoint resultEndpoint;
+
+    @EndpointInject("mock:error")
+    private MockEndpoint errorEndpoint;
+
+    @Autowired
+    private DataSource ds;
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        jdbcTemplate = new JdbcTemplate(ds);
+        jdbcTemplate.afterPropertiesSet();
+        jdbcTemplate.update(INSERT_STRING, PROCESSOR_NAME, "1", new 
Timestamp(System.currentTimeMillis()));
+        jdbcTemplate.update(INSERT_STRING, PROCESSOR_NAME, "2", new 
Timestamp(System.currentTimeMillis()));
+        repository = context.getRegistry().lookupByNameAndType(PROCESSOR_NAME, 
JdbcCachedMessageIdRepository.class);
+        repository.reload();
+    }
+
+    @Test
+    public void testCacheHit() throws Exception {
+        resultEndpoint.expectedBodiesReceived("three");
+        errorEndpoint.expectedMessageCount(0);
+
+        template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
+        template.sendBodyAndHeader("direct:start", "two", "messageId", "2");
+        template.sendBodyAndHeader("direct:start", "three", "messageId", "3");
+        template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
+        template.sendBodyAndHeader("direct:start", "two", "messageId", "2");
+        template.sendBodyAndHeader("direct:start", "three", "messageId", "3");
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(5, repository.getHitCount());
+        assertEquals(1, repository.getMissCount());
+    }
+
+    // *************************************
+    // Config
+    // *************************************
+
+    @Configuration
+    public static class TestConfiguration {
+
+        @Bean
+        public DataSource dataSource() {
+            return initEmptyDb();
+        }
+
+        @Bean
+        public RouteBuilder routeBuilder(DataSource dataSource, CamelContext 
context) {
+            return new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    deadLetterChannel("mock:error");
+
+                    JdbcMessageIdRepository repo = new 
JdbcCachedMessageIdRepository(dataSource, PROCESSOR_NAME);
+                    context.getRegistry().bind(PROCESSOR_NAME, repo);
+                    from("direct:start")
+                            .idempotentConsumer(header("messageId"), repo)
+                            .to("mock:result");
+                }
+            };
+        }
+    }
+}
diff --git 
a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/idempotent/JdbcMessageIdRepositoryTest.java
 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/idempotent/JdbcMessageIdRepositoryTest.java
new file mode 100644
index 0000000..643a231
--- /dev/null
+++ 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/idempotent/JdbcMessageIdRepositoryTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql.idempotent;
+
+import org.apache.camel.Configuration;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sql.BaseSql;
+import org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.test.annotation.DirtiesContext;
+
+import javax.sql.DataSource;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                JdbcMessageIdRepositoryTest.class,
+                JdbcMessageIdRepositoryTest.TestConfiguration.class,
+                BaseSql.TestConfiguration.class
+        }
+)
+public class JdbcMessageIdRepositoryTest extends BaseSql {
+
+    protected static final String SELECT_ALL_STRING = "SELECT messageId FROM 
CAMEL_MESSAGEPROCESSED WHERE processorName = ?";
+    protected static final String CLEAR_STRING = "DELETE FROM 
CAMEL_MESSAGEPROCESSED WHERE processorName = ?";
+    protected static final String PROCESSOR_NAME = "myProcessorName";
+
+    private JdbcTemplate jdbcTemplate;
+
+    @EndpointInject("mock:result")
+    private MockEndpoint resultEndpoint;
+
+    @EndpointInject("mock:error")
+    private MockEndpoint errorEndpoint;
+
+    @Autowired
+    private DataSource ds;
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        jdbcTemplate = new JdbcTemplate(ds);
+        jdbcTemplate.afterPropertiesSet();
+    }
+
+    @Test
+    public void testDuplicateMessagesAreFilteredOut() throws Exception {
+        resultEndpoint.expectedBodiesReceived("one", "two", "three");
+        errorEndpoint.expectedMessageCount(0);
+
+        template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
+        template.sendBodyAndHeader("direct:start", "two", "messageId", "2");
+        template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
+        template.sendBodyAndHeader("direct:start", "two", "messageId", "2");
+        template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
+        template.sendBodyAndHeader("direct:start", "three", "messageId", "3");
+
+        assertMockEndpointsSatisfied();
+
+        // all 3 messages should be in jdbc repo
+        List<String> receivedMessageIds = 
jdbcTemplate.queryForList(SELECT_ALL_STRING, String.class, PROCESSOR_NAME);
+
+        assertEquals(3, receivedMessageIds.size());
+        assertTrue(receivedMessageIds.contains("1"));
+        assertTrue(receivedMessageIds.contains("2"));
+        assertTrue(receivedMessageIds.contains("3"));
+    }
+
+
+    // *************************************
+    // Config
+    // *************************************
+
+    @Configuration
+    public static class TestConfiguration {
+
+        @Bean
+        public DataSource dataSource() {
+            return initEmptyDb();
+        }
+
+        @Bean
+        public RouteBuilder routeBuilder(DataSource dataSource) {
+            return new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    deadLetterChannel("mock:error");
+
+                    JdbcMessageIdRepository repo = new 
JdbcMessageIdRepository(dataSource, PROCESSOR_NAME);
+                    from("direct:start")
+                            .idempotentConsumer(header("messageId"), repo)
+                            .to("mock:result");
+                }
+            };
+        }
+    }
+}
diff --git 
a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/support/DummyJDBCDriver.java
 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/support/DummyJDBCDriver.java
deleted file mode 100644
index 0982bc5..0000000
--- 
a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/support/DummyJDBCDriver.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.sql.support;
-
-import java.sql.Connection;
-import java.sql.Driver;
-import java.sql.DriverPropertyInfo;
-import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
-import java.util.Properties;
-import java.util.logging.Logger;
-
-/**
- * Dummy JDBCDriver to be used in tests.
- */
-public class DummyJDBCDriver implements Driver {
-
-    @Override
-    public Connection connect(String s, Properties properties) throws 
SQLException {
-        return null;
-    }
-
-    @Override
-    public boolean acceptsURL(String s) throws SQLException {
-        return false;
-    }
-
-    @Override
-    public DriverPropertyInfo[] getPropertyInfo(String s, Properties 
properties) throws SQLException {
-        return new DriverPropertyInfo[0];
-    }
-
-    @Override
-    public int getMajorVersion() {
-        return 0;
-    }
-
-    @Override
-    public int getMinorVersion() {
-        return 0;
-    }
-
-    @Override
-    public boolean jdbcCompliant() {
-        return false;
-    }
-
-    @Override
-    public Logger getParentLogger() throws SQLFeatureNotSupportedException {
-        return null;
-    }
-}
diff --git 
a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepositoryTest.java
 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepositoryTest.java
new file mode 100644
index 0000000..710bf11
--- /dev/null
+++ 
b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepositoryTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent.jdbc;
+
+import org.apache.camel.Configuration;
+import org.apache.camel.component.sql.BaseSql;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository;
+import 
org.apache.camel.processor.idempotent.jdbc.JdbcOrphanLockAwareIdempotentRepository;
+import 
org.apache.camel.processor.idempotent.jdbc.JdbcOrphanLockAwareIdempotentRepository.ProcessorNameAndMessageId;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestInstance.Lifecycle;
+import org.springframework.context.annotation.Bean;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
+
+import java.sql.Timestamp;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@TestInstance(Lifecycle.PER_CLASS)
+public class JdbcOrphanLockAwareIdempotentRepositoryTest extends BaseSql {
+
+    private static final String APP_NAME = "APP_1";
+
+    private EmbeddedDatabase dataSource;
+
+    private JdbcOrphanLockAwareIdempotentRepository jdbcMessageIdRepository;
+
+    @BeforeAll
+    public void setup() throws Exception {
+        dataSource = initDb(EmbeddedDatabaseType.HSQL, 
"sql/idempotentWithOrphanLockRemoval.sql");
+        jdbcMessageIdRepository = new 
JdbcOrphanLockAwareIdempotentRepository(dataSource, APP_NAME, new 
DefaultCamelContext());
+        jdbcMessageIdRepository.setLockMaxAgeMillis(3000_00L);
+        jdbcMessageIdRepository.setLockKeepAliveIntervalMillis(3000L);
+        jdbcMessageIdRepository.doInit();
+    }
+
+    @Test
+    public void testLockNotGrantedForCurrentTimeStamp() {
+        assertTrue(jdbcMessageIdRepository.contains("FILE_1"));
+    }
+
+    @Test
+    public void testLockNotGrantedForCurrentTimeStampPlus2Min() {
+        assertTrue(jdbcMessageIdRepository.contains("FILE_2"));
+    }
+
+    @Test
+    public void testLockGrantedForCurrentTimeStampPlus5Min() {
+        assertFalse(jdbcMessageIdRepository.contains("FILE_3"));
+    }
+
+    @Test
+    public void testLockKeepAliveWorks() {
+        assertFalse(jdbcMessageIdRepository.contains("FILE_4"));
+        jdbcMessageIdRepository.insert("FILE_4");
+        assertTrue(jdbcMessageIdRepository.contains("FILE_4"));
+        JdbcTemplate template = new JdbcTemplate(dataSource);
+        Timestamp timestamp = new Timestamp(System.currentTimeMillis() - 5 * 
60 * 1000L);
+        template.update("UPDATE CAMEL_MESSAGEPROCESSED SET createdAT = ? WHERE 
processorName = ? AND messageId = ?", timestamp,
+                APP_NAME, "FILE_4");
+
+        await().atMost(5, TimeUnit.SECONDS).until(() -> 
!jdbcMessageIdRepository.contains("FILE_4"));
+        jdbcMessageIdRepository.keepAlive();
+        assertTrue(jdbcMessageIdRepository.contains("FILE_4"));
+    }
+
+    @Test
+    public void testInsertQueryDelete() {
+        assertFalse(jdbcMessageIdRepository.contains("FILE_5"));
+        assertFalse(jdbcMessageIdRepository.getProcessorNameMessageIdSet()
+                .contains(new ProcessorNameAndMessageId(APP_NAME, "FILE_5")));
+
+        jdbcMessageIdRepository.add("FILE_5");
+
+        assertTrue(jdbcMessageIdRepository.getProcessorNameMessageIdSet()
+                .contains(new ProcessorNameAndMessageId(APP_NAME, "FILE_5")));
+        assertTrue(jdbcMessageIdRepository.contains("FILE_5"));
+        jdbcMessageIdRepository.remove("FILE_5");
+        assertFalse(jdbcMessageIdRepository.contains("FILE_5"));
+        assertFalse(jdbcMessageIdRepository.getProcessorNameMessageIdSet()
+                .contains(new ProcessorNameAndMessageId(APP_NAME, "FILE_5")));
+    }
+}
diff --git 
a/components-starter/camel-sql-starter/src/test/resources/sql/createAndPopulateDatabase.sql
 
b/components-starter/camel-sql-starter/src/test/resources/sql/createAndPopulateDatabase.sql
new file mode 100644
index 0000000..cc9e4b2
--- /dev/null
+++ 
b/components-starter/camel-sql-starter/src/test/resources/sql/createAndPopulateDatabase.sql
@@ -0,0 +1,23 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--      http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+-- START SNIPPET: e1
+create table projects (id integer primary key, project varchar(10), license 
varchar(5));
+insert into projects values (1, 'Camel', 'ASF');
+insert into projects values (2, 'AMQ', 'ASF');
+insert into projects values (3, 'Linux', 'XXX');
+-- END SNIPPET: e1
\ No newline at end of file
diff --git 
a/components-starter/camel-sql-starter/src/test/resources/sql/createAndPopulateDatabase3.sql
 
b/components-starter/camel-sql-starter/src/test/resources/sql/createAndPopulateDatabase3.sql
new file mode 100644
index 0000000..2499f1c
--- /dev/null
+++ 
b/components-starter/camel-sql-starter/src/test/resources/sql/createAndPopulateDatabase3.sql
@@ -0,0 +1,27 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--      http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+-- START SNIPPET: e1
+create table projects (id integer primary key GENERATED ALWAYS AS IDENTITY, 
project varchar(10), license varchar(5), description varchar(1000) default 
null);
+insert into projects (project, license, description) values ('Camel', 'ASF', 
'');
+insert into projects (project, license, description) values ('AMQ', 'ASF', '');
+insert into projects (project, license, description) values ('Linux', 'XXX', 
'');
+-- END SNIPPET: e1
+
+-- START SNIPPET: e2
+create table developers (id1 integer primary key GENERATED ALWAYS AS IDENTITY 
(START WITH 5), name varchar(20), position varchar(20), id2 integer GENERATED 
ALWAYS AS (id1+1));
+-- END SNIPPET: e2
\ No newline at end of file
diff --git 
a/components-starter/camel-sql-starter/src/test/resources/sql/idempotentWithOrphanLockRemoval.sql
 
b/components-starter/camel-sql-starter/src/test/resources/sql/idempotentWithOrphanLockRemoval.sql
new file mode 100644
index 0000000..ce108eb
--- /dev/null
+++ 
b/components-starter/camel-sql-starter/src/test/resources/sql/idempotentWithOrphanLockRemoval.sql
@@ -0,0 +1,14 @@
+-- Add DDL to create tables, views, indexes, etc needed by tests. These should 
match the expected database structure as it will appear in production.
+SET DATABASE SQL SYNTAX PGS TRUE; -- tells HSQLDB that this schema uses MYSQL 
syntax
+SET PROPERTY "sql.enforce_strict_size" FALSE;
+
+CREATE TABLE CAMEL_MESSAGEPROCESSED (processorName VARCHAR(255), messageId 
VARCHAR(100), createdAt TIMESTAMP);
+
+ALTER TABLE CAMEL_MESSAGEPROCESSED ADD PRIMARY KEY (processorName, messageId);
+
+
+INSERT INTO CAMEL_MESSAGEPROCESSED VALUES ('APP_1', 'FILE_1', 
CURRENT_TIMESTAMP);
+
+INSERT INTO CAMEL_MESSAGEPROCESSED VALUES ('APP_1', 
'FILE_2',TIMESTAMPADD(SQL_TSI_MINUTE, -2, CURRENT_TIMESTAMP));
+
+INSERT INTO CAMEL_MESSAGEPROCESSED VALUES ('APP_1', 
'FILE_3',TIMESTAMPADD(SQL_TSI_MINUTE, -5, CURRENT_TIMESTAMP));
diff --git 
a/components-starter/camel-sql-starter/src/test/resources/sql/init.sql 
b/components-starter/camel-sql-starter/src/test/resources/sql/init.sql
new file mode 100644
index 0000000..6e1b3f0
--- /dev/null
+++ b/components-starter/camel-sql-starter/src/test/resources/sql/init.sql
@@ -0,0 +1,29 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--      http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+CREATE TABLE aggregationRepo1 (
+    id varchar(255) NOT NULL,
+    exchange blob NOT NULL,
+    version bigint NOT NULL,
+    constraint aggregationRepo1_pk PRIMARY KEY (id)
+);
+CREATE TABLE aggregationRepo1_completed (
+    id varchar(255) NOT NULL,
+    exchange blob NOT NULL,
+    version bigint NOT NULL,
+    constraint aggregationRepo1_completed_pk PRIMARY KEY (id)
+);
\ No newline at end of file
diff --git 
a/components-starter/camel-sql-starter/src/test/resources/sql/init3.sql 
b/components-starter/camel-sql-starter/src/test/resources/sql/init3.sql
new file mode 100644
index 0000000..baad28a
--- /dev/null
+++ b/components-starter/camel-sql-starter/src/test/resources/sql/init3.sql
@@ -0,0 +1,35 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--      http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+CREATE TABLE aggregationRepo3 (
+    id varchar(255) NOT NULL,
+    exchange blob NOT NULL,
+    version bigint NOT NULL,
+    body varchar(1000),
+    companyName varchar(1000),
+    accountName varchar(1000),
+    constraint aggregationRepo3_pk PRIMARY KEY (id)
+);
+CREATE TABLE aggregationRepo3_completed (
+    id varchar(255) NOT NULL,
+    exchange blob NOT NULL,
+    version bigint NOT NULL,
+    body varchar(1000),
+    companyName varchar(1000),
+    accountName varchar(1000),
+    constraint aggregationRepo3_completed_pk PRIMARY KEY (id)
+);
\ No newline at end of file
diff --git 
a/components-starter/camel-sql-starter/src/test/resources/sql/selectProjectsIn.sql
 
b/components-starter/camel-sql-starter/src/test/resources/sql/selectProjectsIn.sql
new file mode 100644
index 0000000..7f5f590
--- /dev/null
+++ 
b/components-starter/camel-sql-starter/src/test/resources/sql/selectProjectsIn.sql
@@ -0,0 +1,22 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--      http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+-- this is a comment
+select *
+from projects
+where project in (:#in:names)
+order by id
\ No newline at end of file

Reply via email to