This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-spring-boot.git
The following commit(s) were added to refs/heads/main by this push: new c146987 CAMEL-17800 add tests in camel-sql-starter (#472) c146987 is described below commit c146987cd321204e2492ffc2b739b5422db2e165 Author: JiriOndrusek <ondrusek.j...@gmail.com> AuthorDate: Sat Mar 19 09:02:06 2022 +0100 CAMEL-17800 add tests in camel-sql-starter (#472) --- components-starter/camel-sql-starter/pom.xml | 15 ++ .../org/apache/camel/component/sql/BaseSql.java | 70 +++++++++ .../sql/DataSourceAutoConfigurationTest.java | 72 --------- .../sql/SqlConsumerDynamicParameterTest.java | 114 ++++++++++++++ .../sql/SqlEndpointMisconfigureDataSourceTest.java | 104 +++++++++++++ .../camel/component/sql/SqlGeneratedKeysTest.java | 106 +++++++++++++ .../sql/SqlProducerExpressionParameterTest.java | 94 ++++++++++++ .../camel/component/sql/SqlProducerInTest.java | 93 ++++++++++++ .../component/sql/SqlProducerOutputHeaderTest.java | 83 ++++++++++ .../sql/SqlProducerOutputTypeStreamListTest.java | 96 ++++++++++++ .../camel/component/sql/SqlProducerToDTest.java | 92 +++++++++++ .../sql/SqlProducerUpdateHeadersTest.java | 82 ++++++++++ .../sql/SqlProducerUseMessageBodyForSqlTest.java | 153 +++++++++++++++++++ .../component/sql/SqlTransactedRouteTest.java | 168 +++++++++++++++++++++ .../camel/component/sql/aggregation/HeaderDto.java | 60 ++++++++ .../JdbcAggregateRecoverDeadLetterChannelTest.java | 130 ++++++++++++++++ .../JdbcAggregateSerializedHeadersTest.java | 128 ++++++++++++++++ .../aggregation/JdbcAggregateStoreAsTextTest.java | 161 ++++++++++++++++++++ .../sql/aggregation/MyAggregationStrategy.java | 19 +++ .../CustomizedJdbcMessageIdRepositoryTest.java | 131 ++++++++++++++++ .../JdbcCachedMessageIdRepositoryTest.java | 128 ++++++++++++++++ .../idempotent/JdbcMessageIdRepositoryTest.java | 126 ++++++++++++++++ .../component/sql/support/DummyJDBCDriver.java | 66 -------- ...dbcOrphanLockAwareIdempotentRepositoryTest.java | 106 +++++++++++++ .../resources/sql/createAndPopulateDatabase.sql | 23 +++ .../resources/sql/createAndPopulateDatabase3.sql | 27 ++++ .../sql/idempotentWithOrphanLockRemoval.sql | 14 ++ .../src/test/resources/sql/init.sql | 29 ++++ .../src/test/resources/sql/init3.sql | 35 +++++ .../src/test/resources/sql/selectProjectsIn.sql | 22 +++ 30 files changed, 2409 insertions(+), 138 deletions(-) diff --git a/components-starter/camel-sql-starter/pom.xml b/components-starter/camel-sql-starter/pom.xml index 58bbc46..54be447 100644 --- a/components-starter/camel-sql-starter/pom.xml +++ b/components-starter/camel-sql-starter/pom.xml @@ -45,6 +45,21 @@ <version>${spring-boot-version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.derby</groupId> + <artifactId>derby</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.hsqldb</groupId> + <artifactId>hsqldb</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <scope>test</scope> + </dependency> <!--START OF GENERATED CODE--> <dependency> <groupId>org.apache.camel.springboot</groupId> diff --git a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/BaseSql.java b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/BaseSql.java new file mode 100644 index 0000000..22e528c --- /dev/null +++ b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/BaseSql.java @@ -0,0 +1,70 @@ +package org.apache.camel.component.sql; + +import org.apache.camel.CamelContext; +import org.apache.camel.Configuration; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.component.mock.MockEndpoint; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; + +import javax.sql.DataSource; + +public class BaseSql { + + @Autowired + protected CamelContext context; + + @Autowired + protected ProducerTemplate template; + + protected static EmbeddedDatabase initEmptyDb() { + return initDb(null); + } + + protected static EmbeddedDatabase initDb() { + return initDb("sql/createAndPopulateDatabase.sql"); + } + + protected static EmbeddedDatabase initDb(String script) { + return initDb(EmbeddedDatabaseType.DERBY, script); + } + + protected static EmbeddedDatabase initDb(EmbeddedDatabaseType type, String script) { + EmbeddedDatabaseBuilder builder = new EmbeddedDatabaseBuilder() + .setName(BaseSql.class.getSimpleName()) + .setType(type); + if(script != null && !"".equals(script)) { + builder.addScript(script); + } + EmbeddedDatabase ed = builder.build(); + return ed; + } + + protected void assertMockEndpointsSatisfied() throws InterruptedException { + MockEndpoint.assertIsSatisfied(this.context); + } + + // ************************************* + // Config + // ************************************* + + @Configuration + public static class TestConfiguration { + + @Bean + public DisposableBean embeddedDatabaseShutdownExecutor(DataSource dataSource) { + return new DisposableBean() { + + @Override + public void destroy() throws Exception { + ((EmbeddedDatabase)dataSource).shutdown(); + } + + }; + } + } +} \ No newline at end of file diff --git a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/DataSourceAutoConfigurationTest.java b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/DataSourceAutoConfigurationTest.java deleted file mode 100644 index 06256a2..0000000 --- a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/DataSourceAutoConfigurationTest.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.sql; - -import javax.sql.DataSource; - -import org.apache.camel.CamelContext; -import org.apache.camel.component.sql.stored.SqlStoredComponent; -import org.apache.camel.component.sql.stored.SqlStoredEndpoint; -import org.apache.camel.test.spring.junit5.CamelSpringBootTest; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.ContextConfiguration; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; - -@CamelSpringBootTest -@SpringBootApplication -@DirtiesContext -@ContextConfiguration(classes = DataSourceAutoConfigurationTest.class) -@SpringBootTest(properties = { - "spring.datasource.url=jdbc:dummy://localhost/test", - "spring.datasource.username=dbuser", - "spring.datasource.password=dbpass", - "spring.datasource.driver-class-name=org.apache.camel.component.sql.support.DummyJDBCDriver" -}) -public class DataSourceAutoConfigurationTest { - - @Autowired - private DataSource datasource; - - @Autowired - private CamelContext context; - - @Test - public void testInjectionWorks() { - assertNotNull(datasource); - } - - @Test - public void testSqlComponentUsesTheConfiguredDatasource() throws Exception { - SqlComponent component = (SqlComponent) context.getComponent("sql"); - SqlEndpoint endpoint = (SqlEndpoint) component.createEndpoint("sql:select * from table where id=#"); - assertEquals(datasource, endpoint.getJdbcTemplate().getDataSource()); - } - - @Test - public void testSqlStoredComponentUsesTheConfiguredDatasource() throws Exception { - SqlStoredComponent component = (SqlStoredComponent) context.getComponent("sql-stored"); - SqlStoredEndpoint endpoint = (SqlStoredEndpoint) component.createEndpoint("sql:select * from table where id=#"); - assertEquals(datasource, endpoint.getJdbcTemplate().getDataSource()); - } - -} diff --git a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlConsumerDynamicParameterTest.java b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlConsumerDynamicParameterTest.java new file mode 100644 index 0000000..d889e11 --- /dev/null +++ b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlConsumerDynamicParameterTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sql; + +import org.apache.camel.Configuration; +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.spring.boot.CamelAutoConfiguration; +import org.apache.camel.test.spring.junit5.CamelSpringBootTest; +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; +import org.springframework.test.annotation.DirtiesContext; + +import javax.sql.DataSource; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS) +@CamelSpringBootTest +@SpringBootTest( + classes = { + CamelAutoConfiguration.class, + SqlConsumerDynamicParameterTest.class, + SqlConsumerDynamicParameterTest.TestConfiguration.class, + BaseSql.TestConfiguration.class + } +) +public class SqlConsumerDynamicParameterTest extends BaseSql { + + @EndpointInject("mock:result") + private MockEndpoint resultEndpoint; + + @Test + public void testDynamicConsumer() throws Exception { + MyIdGenerator idGenerator = new MyIdGenerator(); + context.getRegistry().bind("myIdGenerator", idGenerator); + resultEndpoint.expectedMessageCount(3); + + context.getRouteController().startRoute("foo"); + + assertMockEndpointsSatisfied(); + + List<Exchange> exchanges = resultEndpoint.getReceivedExchanges(); + + assertEquals(1, exchanges.get(0).getIn().getBody(Map.class).get("ID")); + assertEquals("Camel", exchanges.get(0).getIn().getBody(Map.class).get("PROJECT")); + assertEquals(2, exchanges.get(1).getIn().getBody(Map.class).get("ID")); + assertEquals("AMQ", exchanges.get(1).getIn().getBody(Map.class).get("PROJECT")); + assertEquals(3, exchanges.get(2).getIn().getBody(Map.class).get("ID")); + assertEquals("Linux", exchanges.get(2).getIn().getBody(Map.class).get("PROJECT")); + + // and the bean id should be > 1 + assertTrue(idGenerator.getId() > 1, "Id counter should be > 1"); + } + + public static class MyIdGenerator { + + private int id = 1; + + public int nextId() { + return id++; + } + + public int getId() { + return id; + } + } + + // ************************************* + // Config + // ************************************* + + @Configuration + public static class TestConfiguration { + + @Bean + public DataSource dataSource() { + return initDb(EmbeddedDatabaseType.HSQL, "sql/createAndPopulateDatabase.sql"); + } + + @Bean + public RouteBuilder routeBuilder() { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("sql:select * from projects where id = :#${bean:myIdGenerator.nextId}?initialDelay=0&delay=50") + .routeId("foo").noAutoStartup() + .to("mock:result"); + } + }; + } + } +} diff --git a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlEndpointMisconfigureDataSourceTest.java b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlEndpointMisconfigureDataSourceTest.java new file mode 100644 index 0000000..fc33dc4 --- /dev/null +++ b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlEndpointMisconfigureDataSourceTest.java @@ -0,0 +1,104 @@ +package org.apache.camel.component.sql;/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.camel.FailedToCreateRouteException; +import org.apache.camel.PropertyBindingException; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.spring.boot.CamelAutoConfiguration; +import org.apache.camel.test.spring.junit5.CamelSpringBootTest; +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.test.annotation.DirtiesContext; + +import javax.sql.DataSource; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS) +@CamelSpringBootTest +@SpringBootTest( + classes = { + CamelAutoConfiguration.class, + SqlEndpointMisconfigureDataSourceTest.class, + SqlEndpointMisconfigureDataSourceTest.TestConfiguration.class, + BaseSql.TestConfiguration.class + } +) +public class SqlEndpointMisconfigureDataSourceTest extends BaseSql { + + + @Test + public void testFail() { + RouteBuilder rb = new RouteBuilder() { + @Override + public void configure() { + from("direct:start") + .to("sql:foo?dataSource=myDataSource") + .to("mock:result"); + } + }; + + FailedToCreateRouteException e = assertThrows(FailedToCreateRouteException.class, () -> context.addRoutes(rb), + "Should throw exception"); + + PropertyBindingException pbe = (PropertyBindingException) e.getCause().getCause(); + assertEquals("dataSource", pbe.getPropertyName()); + assertEquals("myDataSource", pbe.getValue()); + } + + @Test + public void testOk() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() { + from("direct:start") + .to("sql:foo?dataSource=#myDataSource") + .to("mock:result"); + } + }); + assertDoesNotThrow(() -> context.start()); + } + + // ************************************* + // Config + // ************************************* + + @Configuration + public static class TestConfiguration { + + @Bean(name = "myDataSource") + public DataSource dataSource() { + return initDb(); + } + + @Bean + public RouteBuilder routeBuilder() { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:query").to("sql:select max(id) from projects?outputType=SelectOne&outputHeader=MaxProjectID") + .to("mock:query"); + } + }; + } + } + +} diff --git a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlGeneratedKeysTest.java b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlGeneratedKeysTest.java new file mode 100644 index 0000000..855ce90 --- /dev/null +++ b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlGeneratedKeysTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sql; + +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.spring.boot.CamelAutoConfiguration; +import org.apache.camel.test.spring.junit5.CamelSpringBootTest; +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; +import org.springframework.test.annotation.DirtiesContext; + +import javax.sql.DataSource; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS) +@CamelSpringBootTest +@SpringBootTest( + classes = { + CamelAutoConfiguration.class, + SqlGeneratedKeysTest.class, + SqlGeneratedKeysTest.TestConfiguration.class, + BaseSql.TestConfiguration.class + } +) +public class SqlGeneratedKeysTest extends BaseSql { + + @Test + @SuppressWarnings("unchecked") + public void testRetrieveGeneratedKey() { + // first we create our exchange using the endpoint + Endpoint endpoint = context.getEndpoint("direct:insert"); + + Object body = new Object[] { "project x", "ASF", "new project" }; + Exchange exchange = endpoint.createExchange(); + exchange.getIn().setBody(body); + exchange.getIn().setHeader(SqlConstants.SQL_RETRIEVE_GENERATED_KEYS, true); + + // now we send the exchange to the endpoint, and receives the response from Camel + Exchange out = template.send(endpoint, exchange); + + // assertions of the response + assertNotNull(out); + assertNotNull(out.getMessage()); + assertNotNull(out.getMessage().getHeader(SqlConstants.SQL_GENERATED_KEYS_DATA)); + assertSame(body, out.getMessage().getBody()); + + List<Map<String, Object>> generatedKeys = out.getMessage().getHeader(SqlConstants.SQL_GENERATED_KEYS_DATA, List.class); + assertNotNull(generatedKeys, "out body could not be converted to a List - was: " + + out.getMessage().getBody()); + assertEquals(1, generatedKeys.get(0).size()); + + Map<String, Object> row = generatedKeys.get(0); + assertEquals(3, row.get("ID"), "auto increment value should be 3"); + + assertEquals(1, out.getMessage().getHeader(SqlConstants.SQL_GENERATED_KEYS_ROW_COUNT), + "generated keys row count should be one"); + } + + // ************************************* + // Config + // ************************************* + + @Configuration + public static class TestConfiguration { + + @Bean + public DataSource dataSource() { + return initDb(EmbeddedDatabaseType.HSQL, "sql/createAndPopulateDatabase3.sql"); + } + + @Bean + public RouteBuilder routeBuilder() { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:insert").to("sql:insert into projects (project, license, description) values (#, #, #)"); + } + }; + } + } +} diff --git a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerExpressionParameterTest.java b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerExpressionParameterTest.java new file mode 100644 index 0000000..02a99ba --- /dev/null +++ b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerExpressionParameterTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sql; + +import org.apache.camel.Configuration; +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.spring.boot.CamelAutoConfiguration; +import org.apache.camel.test.spring.junit5.CamelSpringBootTest; + +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; +import org.springframework.test.annotation.DirtiesContext; + +import javax.sql.DataSource; +import java.util.List; +import java.util.Map; + +import static org.apache.camel.test.junit5.TestSupport.assertIsInstanceOf; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS) +@CamelSpringBootTest +@SpringBootTest( + classes = { + CamelAutoConfiguration.class, + SqlProducerExpressionParameterTest.class, + SqlProducerExpressionParameterTest.TestConfiguration.class, + BaseSql.TestConfiguration.class + } +) +public class SqlProducerExpressionParameterTest extends BaseSql { + + @EndpointInject("mock:result") + MockEndpoint result; + + @Test + public void testNamedParameterFromExpression() throws Exception { + result.expectedMessageCount(1); + + template.sendBodyAndProperty("direct:start", "This is a dummy body", "license", "ASF"); + + result.assertIsSatisfied(); + + List<?> received = assertIsInstanceOf(List.class, result.getReceivedExchanges().get(0).getIn().getBody()); + assertEquals(2, received.size()); + Map<?, ?> row = assertIsInstanceOf(Map.class, received.get(0)); + assertEquals("Camel", row.get("PROJECT")); + row = assertIsInstanceOf(Map.class, received.get(1)); + assertEquals("AMQ", row.get("PROJECT")); + } + + + // ************************************* + // Config + // ************************************* + + @Configuration + public static class TestConfiguration { + + @Bean + public DataSource dataSource() { + return initDb(EmbeddedDatabaseType.HSQL, "sql/createAndPopulateDatabase3.sql"); + } + + @Bean + public RouteBuilder routeBuilder() { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").to("sql:select * from projects where license = :#${exchangeProperty.license} order by id") + .to("mock:result"); + } + }; + } + } +} diff --git a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerInTest.java b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerInTest.java new file mode 100644 index 0000000..4738535 --- /dev/null +++ b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerInTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sql; + +import org.apache.camel.Configuration; +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.spring.boot.CamelAutoConfiguration; +import org.apache.camel.test.spring.junit5.CamelSpringBootTest; +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; +import org.springframework.test.annotation.DirtiesContext; + +import javax.sql.DataSource; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS) +@CamelSpringBootTest +@SpringBootTest( + classes = { + CamelAutoConfiguration.class, + SqlProducerInTest.class, + SqlProducerInTest.TestConfiguration.class, + BaseSql.TestConfiguration.class + } +) +public class SqlProducerInTest extends BaseSql { + + @EndpointInject("mock:query") + private MockEndpoint queryEndpoint; + + @Test + public void testQueryInString() throws InterruptedException { + queryEndpoint.expectedMessageCount(1); + + template.requestBodyAndHeader("direct:query", "Hi there!", "names", "Camel,AMQ"); + + assertMockEndpointsSatisfied(); + + List list = queryEndpoint.getReceivedExchanges().get(0).getIn().getBody(List.class); + assertEquals(2, list.size()); + Map row = (Map) list.get(0); + assertEquals("Camel", row.get("PROJECT")); + row = (Map) list.get(1); + assertEquals("AMQ", row.get("PROJECT")); + } + + // ************************************* + // Config + // ************************************* + + @Configuration + public static class TestConfiguration { + + @Bean + public DataSource dataSource() { + return initDb(EmbeddedDatabaseType.HSQL, "sql/createAndPopulateDatabase.sql"); + } + + @Bean + public RouteBuilder routeBuilder() { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:query") + .to("sql:classpath:sql/selectProjectsIn.sql") + .to("log:query") + .to("mock:query"); + } + }; + } + } +} diff --git a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerOutputHeaderTest.java b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerOutputHeaderTest.java new file mode 100644 index 0000000..42719eb --- /dev/null +++ b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerOutputHeaderTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sql; + +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.spring.boot.CamelAutoConfiguration; +import org.apache.camel.test.spring.junit5.CamelSpringBootTest; +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.test.annotation.DirtiesContext; + +import javax.sql.DataSource; + +@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS) +@CamelSpringBootTest +@SpringBootTest( + classes = { + CamelAutoConfiguration.class, + SqlProducerOutputHeaderTest.class, + SqlProducerOutputHeaderTest.TestConfiguration.class, + BaseSql.TestConfiguration.class + } +) +public class SqlProducerOutputHeaderTest extends BaseSql { + + @EndpointInject("mock:query") + private MockEndpoint queryEndpoint; + + @Test + public void testQueryOutputHeader() throws InterruptedException { + queryEndpoint.expectedMessageCount(1); + queryEndpoint.expectedHeaderReceived(SqlConstants.SQL_ROW_COUNT, 1); + queryEndpoint.expectedHeaderReceived("MaxProjectID", 3); + queryEndpoint.message(0).body().isEqualTo("Hi there!"); + + template.requestBody("direct:query", "Hi there!"); + + assertMockEndpointsSatisfied(); + } + + + // ************************************* + // Config + // ************************************* + + @Configuration + public static class TestConfiguration { + + @Bean + public DataSource dataSource() { + return initDb(); + } + + @Bean + public RouteBuilder routeBuilder() { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:query").to("sql:select max(id) from projects?outputType=SelectOne&outputHeader=MaxProjectID") + .to("mock:query"); + } + }; + } + } +} diff --git a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerOutputTypeStreamListTest.java b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerOutputTypeStreamListTest.java new file mode 100644 index 0000000..4d1b0c7 --- /dev/null +++ b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerOutputTypeStreamListTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sql; + +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.spring.boot.CamelAutoConfiguration; +import org.apache.camel.test.spring.junit5.CamelSpringBootTest; +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.test.annotation.DirtiesContext; + +import javax.sql.DataSource; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; + +@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS) +@CamelSpringBootTest +@SpringBootTest( + classes = { + CamelAutoConfiguration.class, + SqlProducerOutputTypeStreamListTest.class, + SqlProducerOutputTypeStreamListTest.TestConfiguration.class, + BaseSql.TestConfiguration.class + } +) +public class SqlProducerOutputTypeStreamListTest extends BaseSql { + + + @EndpointInject("mock:result") + private MockEndpoint resultEndpoint; + + @Test + public void testSplit() throws Exception { + resultEndpoint.expectedMessageCount(3); + + template.sendBody("direct:withSplit", "testmsg"); + + assertMockEndpointsSatisfied(); + assertThat(resultBodyAt(resultEndpoint, 0), instanceOf(Map.class)); + assertThat(resultBodyAt(resultEndpoint, 1), instanceOf(Map.class)); + assertThat(resultBodyAt(resultEndpoint, 2), instanceOf(Map.class)); + } + + private Object resultBodyAt(MockEndpoint result, int index) { + return result.assertExchangeReceived(index).getIn().getBody(); + } + + // ************************************* + // Config + // ************************************* + + @Configuration + public static class TestConfiguration { + + @Bean + public DataSource dataSource() { + return initDb(); + } + + @Bean + public RouteBuilder routeBuilder() { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:withSplit") + .to("sql:select * from projects order by id?outputType=StreamList") + .to("log:stream") + .split(body()).streaming() + .to("log:row") + .to("mock:result") + .end(); + } + }; + } + } +} diff --git a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerToDTest.java b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerToDTest.java new file mode 100644 index 0000000..199e029 --- /dev/null +++ b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerToDTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sql; + +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.spring.boot.CamelAutoConfiguration; +import org.apache.camel.test.spring.junit5.CamelSpringBootTest; +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.test.annotation.DirtiesContext; + +import javax.sql.DataSource; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS) +@CamelSpringBootTest +@SpringBootTest( + classes = { + CamelAutoConfiguration.class, + SqlProducerToDTest.class, + SqlProducerToDTest.TestConfiguration.class, + BaseSql.TestConfiguration.class + } +) +public class SqlProducerToDTest extends BaseSql { + + @EndpointInject("mock:query") + private MockEndpoint queryEndpoint; + + @Test + public void testToD() throws InterruptedException { + queryEndpoint.expectedMessageCount(1); + + template.requestBodyAndHeader("direct:query", "Hi there!", "foo", "AMQ"); + + assertMockEndpointsSatisfied(); + + List list = queryEndpoint.getReceivedExchanges().get(0).getIn().getBody(List.class); + assertEquals(1, list.size()); + Map row = (Map) list.get(0); + assertEquals("AMQ", row.get("PROJECT")); + } + + // ************************************* + // Config + // ************************************* + + @Configuration + public static class TestConfiguration { + + @Bean(name = "myDS") + public DataSource dataSource() { + return initDb(); + } + + @Bean + public RouteBuilder routeBuilder() { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:query") + .setHeader("myQuery", constant("select * from projects where project = :#foo order by id")) + .toD("sql:${header.myQuery}?dataSource=#myDS") + .to("log:query") + .to("mock:query"); + } + }; + } + } + +} diff --git a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerUpdateHeadersTest.java b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerUpdateHeadersTest.java new file mode 100644 index 0000000..ebb0f7a --- /dev/null +++ b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerUpdateHeadersTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sql; + +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.spring.boot.CamelAutoConfiguration; +import org.apache.camel.test.spring.junit5.CamelSpringBootTest; +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.test.annotation.DirtiesContext; + +import javax.sql.DataSource; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS) +@CamelSpringBootTest +@SpringBootTest( + classes = { + CamelAutoConfiguration.class, + SqlProducerUpdateHeadersTest.class, + SqlProducerUpdateHeadersTest.TestConfiguration.class, + BaseSql.TestConfiguration.class + } +) +public class SqlProducerUpdateHeadersTest extends BaseSql { + + @EndpointInject("mock:update") + private MockEndpoint updateEndpoint; + + @Test + public void testUpdateNoop() throws InterruptedException { + updateEndpoint.expectedMessageCount(1); + updateEndpoint.expectedHeaderReceived(SqlConstants.SQL_UPDATE_COUNT, 1); + updateEndpoint.message(0).body().isEqualTo("Hi there!"); + + template.requestBody("direct:update", "Hi there!"); + + assertMockEndpointsSatisfied(); + } + + // ************************************* + // Config + // ************************************* + + @Configuration + public static class TestConfiguration { + + @Bean + public DataSource dataSource() { + return initDb(); + } + + @Bean + public RouteBuilder routeBuilder() { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:update").to("sql:update projects set license='MIT' where id=3").to("mock:update"); + } + }; + } + } +} diff --git a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerUseMessageBodyForSqlTest.java b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerUseMessageBodyForSqlTest.java new file mode 100644 index 0000000..449a2d9 --- /dev/null +++ b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlProducerUseMessageBodyForSqlTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sql; + +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.spring.boot.CamelAutoConfiguration; +import org.apache.camel.test.spring.junit5.CamelSpringBootTest; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.test.annotation.DirtiesContext; + +import javax.sql.DataSource; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.camel.test.junit5.TestSupport.assertIsInstanceOf; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS) +@CamelSpringBootTest +@SpringBootTest( + classes = { + CamelAutoConfiguration.class, + SqlProducerUseMessageBodyForSqlTest.class, + SqlProducerUseMessageBodyForSqlTest.TestConfiguration.class, + BaseSql.TestConfiguration.class + } +) +public class SqlProducerUseMessageBodyForSqlTest extends BaseSql { + + @EndpointInject("mock:result") + private MockEndpoint resultEndpoint; + + @EndpointInject("mock:resultInsert") + private MockEndpoint resultInsertEndpoint; + + @BeforeEach + public void resetMock() throws Exception { + resultEndpoint.reset(); + resultInsertEndpoint.reset(); + } + + @Test + public void testUseMessageBodyForSqlAndHeaderParams() throws Exception { + resultEndpoint.reset(); + resultEndpoint.expectedMessageCount(1); + + template.sendBodyAndHeader("direct:start", null, "lic", "ASF"); + + List<?> received = assertIsInstanceOf(List.class, resultEndpoint.getReceivedExchanges().get(0).getIn().getBody()); + assertEquals(2, received.size()); + Map<?, ?> row = assertIsInstanceOf(Map.class, received.get(0)); + assertEquals("Camel", row.get("PROJECT")); + + row = assertIsInstanceOf(Map.class, received.get(1)); + assertEquals("AMQ", row.get("PROJECT")); + } + + + @Test + @SuppressWarnings({ "unchecked", "deprecated" }) + public void testUseMessageBodyForSqlAndCamelSqlParametersBatch() throws Exception { + + resultInsertEndpoint.expectedMessageCount(1); + + List<Map<String, Object>> rows = new ArrayList<>(); + Map<String, Object> row = new HashMap<>(); + row.put("id", 200); + row.put("project", "MyProject1"); + row.put("lic", "OPEN1"); + rows.add(row); + row = new HashMap<>(); + row.put("id", 201); + row.put("project", "MyProject2"); + row.put("lic", "OPEN1"); + rows.add(row); + template.sendBodyAndHeader("direct:insert", null, SqlConstants.SQL_PARAMETERS, rows); + + String origSql = assertIsInstanceOf(String.class, resultInsertEndpoint.getReceivedExchanges().get(0).getIn().getBody()); + assertEquals("insert into projects(id, project, license) values(:?id,:?project,:?lic)", origSql); + + assertEquals(null, resultInsertEndpoint.getReceivedExchanges().get(0).getOut().getBody()); + + // Clear and then use route2 to verify result of above insert select + context.removeRoute(context.getRoutes().get(0).getId()); + + resultEndpoint.reset(); + resultEndpoint.expectedMessageCount(1); + + template.sendBodyAndHeader("direct:start", null, "lic", "OPEN1"); + + List<?> received = assertIsInstanceOf(List.class, resultEndpoint.getReceivedExchanges().get(0).getIn().getBody()); + assertEquals(2, received.size()); + row = assertIsInstanceOf(Map.class, received.get(0)); + assertEquals("MyProject1", row.get("PROJECT")); + + row = assertIsInstanceOf(Map.class, received.get(1)); + assertEquals("MyProject2", row.get("PROJECT")); + } + + // ************************************* + // Config + // ************************************* + + @Configuration + public static class TestConfiguration { + + @Bean + public DataSource dataSource() { + return initDb(); + } + + @Bean + public RouteBuilder routeBuilder() { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .setBody(constant("select * from projects where license = :?lic order by id")) + .to("sql://query?useMessageBodyForSql=true") + .to("mock:result"); + + + from("direct:insert").routeId("baz") + .setBody(constant("insert into projects(id, project, license) values(:?id,:?project,:?lic)")) + .to("sql://query?useMessageBodyForSql=true&batch=true") + .to("mock:resultInsert"); + } + }; + } + } +} diff --git a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlTransactedRouteTest.java b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlTransactedRouteTest.java new file mode 100644 index 0000000..8bb911a --- /dev/null +++ b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/SqlTransactedRouteTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sql; + +import org.apache.camel.Configuration; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.spi.Registry; +import org.apache.camel.spring.boot.CamelAutoConfiguration; +import org.apache.camel.spring.spi.SpringTransactionPolicy; +import org.apache.camel.support.SimpleRegistry; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.apache.camel.test.spring.junit5.CamelSpringBootTest; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.transaction.PlatformTransactionManager; + +import javax.sql.DataSource; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD) +@CamelSpringBootTest +@SpringBootTest( + classes = { + CamelAutoConfiguration.class, + SqlTransactedRouteTest.class, + SqlTransactedRouteTest.TestConfiguration.class, + BaseSql.TestConfiguration.class + } +) +public class SqlTransactedRouteTest extends BaseSql { + + private JdbcTemplate jdbc; + + private static String startEndpoint = "direct:start"; + private static String sqlEndpoint = "sql:overriddenByTheHeader?dataSource=#testdb"; + + @Autowired + private DataSource ds; + + @Autowired + private PlatformTransactionManager txMgr; + + @BeforeEach + public void setUp() throws Exception { + jdbc = new JdbcTemplate(ds); + jdbc.execute("CREATE TABLE CUSTOMER (ID VARCHAR(15) NOT NULL PRIMARY KEY, NAME VARCHAR(100))"); + } + + @Test + public void testCommit() throws Exception { + Exchange exchange = template.send(startEndpoint, e -> + e.getIn().setHeader(SqlConstants.SQL_QUERY, "insert into customer values('cust1','cmueller')") + + ); + + assertFalse(exchange.isFailed()); + + long count = jdbc.queryForObject("select count(*) from customer", Long.class); + assertEquals(2, count); + + Map<String, Object> map = jdbc.queryForMap("select * from customer where id = 'cust1'"); + assertEquals(2, map.size()); + assertEquals("cust1", map.get("ID")); + assertEquals("cmueller", map.get("NAME")); + + map = jdbc.queryForMap("select * from customer where id = 'cust2'"); + assertEquals(2, map.size()); + assertEquals("cust2", map.get("ID")); + assertEquals("muellerc", map.get("NAME")); + } + + @Test + public void testRollbackAfterAnException() throws Exception { + Exchange exchange = template.send("direct:start2", new Processor() { + @Override + public void process(Exchange exchange) { + exchange.getIn().setHeader(SqlConstants.SQL_QUERY, "insert into customer values('cust1','cmueller')"); + } + }); + + assertTrue(exchange.isFailed()); + + long count = jdbc.queryForObject("select count(*) from customer", Long.class); + assertEquals(0, count); + } + + // ************************************* + // Config + // ************************************* + + @Configuration + public static class TestConfiguration { + + @Bean("testdb") + public DataSource dataSource() { + return initEmptyDb(); + } + + @Bean(name = "required") + public SpringTransactionPolicy transactionManager(PlatformTransactionManager platformTransactionManager) { + SpringTransactionPolicy txPolicy = new SpringTransactionPolicy(); + txPolicy.setTransactionManager(platformTransactionManager); + txPolicy.setPropagationBehaviorName("PROPAGATION_REQUIRED"); + return txPolicy; + } + + @Bean(name = "txManager") + public PlatformTransactionManager transactionManager(DataSource dataSource) { + return new DataSourceTransactionManager(dataSource); + } + + @Bean + public RouteBuilder routeBuilder() { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").routeId("commit") + .transacted("required") + .to(sqlEndpoint) + .process(e -> e.getIn().setHeader(SqlConstants.SQL_QUERY, + "insert into customer values('cust2','muellerc')") + ) + .to(sqlEndpoint); + + from("direct:start2").routeId("rollback2") + .transacted("required") + .to(sqlEndpoint) + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + throw new Exception("forced Exception"); + } + }); + } + }; + } + } +} diff --git a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/aggregation/HeaderDto.java b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/aggregation/HeaderDto.java new file mode 100644 index 0000000..f58785e --- /dev/null +++ b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/aggregation/HeaderDto.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sql.aggregation; + +import java.io.Serializable; + +public class HeaderDto implements Cloneable, Serializable { + private String org; + private String type; + private int key; + + public HeaderDto(String org, String type, int key) { + this.org = org; + this.type = type; + this.key = key; + } + + public int getKey() { + return key; + } + + public void setKey(int key) { + this.key = key; + } + + public String getOrg() { + return org; + } + + public void setOrg(String org) { + this.org = org; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + @Override + public String toString() { + return "HeaderDto [org=" + org + ", type=" + type + ", key=" + key + "]"; + } +} diff --git a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/aggregation/JdbcAggregateRecoverDeadLetterChannelTest.java b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/aggregation/JdbcAggregateRecoverDeadLetterChannelTest.java new file mode 100644 index 0000000..bf2c62e --- /dev/null +++ b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/aggregation/JdbcAggregateRecoverDeadLetterChannelTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sql.aggregation; + +import org.apache.camel.Configuration; +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.sql.BaseSql; +import org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository; +import org.apache.camel.spring.boot.CamelAutoConfiguration; +import org.apache.camel.test.spring.junit5.CamelSpringBootTest; +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.transaction.PlatformTransactionManager; + +import javax.sql.DataSource; +import java.util.concurrent.TimeUnit; + +@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS) +@CamelSpringBootTest +@SpringBootTest( + classes = { + CamelAutoConfiguration.class, + JdbcAggregateRecoverDeadLetterChannelTest.class, + JdbcAggregateRecoverDeadLetterChannelTest.TestConfiguration.class, + BaseSql.TestConfiguration.class + } +) +public class JdbcAggregateRecoverDeadLetterChannelTest extends BaseSql { + + @EndpointInject("mock:result") + protected MockEndpoint resultEndpoint; + + @EndpointInject("mock:aggregated") + protected MockEndpoint aggregatedEndpoint; + + @EndpointInject("mock:dead") + protected MockEndpoint deadEndpoint; + + @Test + public void testJdbcAggregateRecoverDeadLetterChannel() throws Exception { + // should fail all times + resultEndpoint.expectedMessageCount(0); + aggregatedEndpoint.expectedMessageCount(4); + deadEndpoint.expectedBodiesReceived("ABCDE"); + deadEndpoint.message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE); + deadEndpoint.message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(3); + + template.sendBodyAndHeader("direct:start", "A", "id", 123); + template.sendBodyAndHeader("direct:start", "B", "id", 123); + template.sendBodyAndHeader("direct:start", "C", "id", 123); + template.sendBodyAndHeader("direct:start", "D", "id", 123); + template.sendBodyAndHeader("direct:start", "E", "id", 123); + + assertMockEndpointsSatisfied(); + } + + + // ************************************* + // Config + // ************************************* + + @Configuration + public static class TestConfiguration { + + @Bean + public PlatformTransactionManager transactionManager(DataSource dataSource) { + return new DataSourceTransactionManager(dataSource); + } + + @Bean + public DataSource dataSource() { + return initDb("sql/init.sql"); + } + + @Bean + public JdbcAggregationRepository jdbcAggregationRepository(DataSource dataSource, PlatformTransactionManager transactionManager) { + JdbcAggregationRepository repo = new JdbcAggregationRepository(transactionManager, "aggregationRepo1", dataSource); + + // enable recovery + repo.setUseRecovery(true); + // exhaust after at most 3 attempts + repo.setMaximumRedeliveries(3); + // and move to this dead letter channel + repo.setDeadLetterUri("mock:dead"); + // check faster + repo.setRecoveryInterval(500, TimeUnit.MILLISECONDS); + + return repo; + } + + @Bean + public RouteBuilder routeBuilder(JdbcAggregationRepository repo) { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + deadLetterChannel("mock:error"); + + from("direct:start") + .aggregate(header("id"), new MyAggregationStrategy()) + .completionSize(5).aggregationRepository(repo) + .log("aggregated exchange id ${exchangeId} with ${body}") + .to("mock:aggregated") + .throwException(new IllegalArgumentException("Damn")) + .to("mock:result") + .end(); + } + }; + } + } +} diff --git a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/aggregation/JdbcAggregateSerializedHeadersTest.java b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/aggregation/JdbcAggregateSerializedHeadersTest.java new file mode 100644 index 0000000..b17c539 --- /dev/null +++ b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/aggregation/JdbcAggregateSerializedHeadersTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sql.aggregation; + +import org.apache.camel.Configuration; +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.sql.BaseSql; +import org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository; +import org.apache.camel.spring.boot.CamelAutoConfiguration; +import org.apache.camel.test.spring.junit5.CamelSpringBootTest; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.jdbc.support.lob.AbstractLobHandler; +import org.springframework.jdbc.support.lob.DefaultLobHandler; +import org.springframework.jdbc.support.lob.LobCreator; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.transaction.PlatformTransactionManager; + +import javax.sql.DataSource; +import java.io.InputStream; +import java.io.Reader; +import java.nio.charset.StandardCharsets; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.concurrent.TimeUnit; + +@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS) +@CamelSpringBootTest +@SpringBootTest( + classes = { + CamelAutoConfiguration.class, + JdbcAggregateSerializedHeadersTest.class, + JdbcAggregateSerializedHeadersTest.TestConfiguration.class, + BaseSql.TestConfiguration.class + } +) +public class JdbcAggregateSerializedHeadersTest extends BaseSql { + + private static final Logger LOG = LoggerFactory.getLogger(JdbcAggregateSerializedHeadersTest.class); + private static final int SIZE = 500; + + @EndpointInject("mock:result") + protected MockEndpoint resultEndpoint; + + @Test + public void testLoadTestJdbcAggregate() throws Exception { + resultEndpoint.expectedMinimumMessageCount(1); + resultEndpoint.setResultWaitTime(50 * 1000); + + LOG.info("Staring to send " + SIZE + " messages."); + + for (int i = 0; i < SIZE; i++) { + final int value = 1; + HeaderDto headerDto = new HeaderDto("org", "company", 1); + LOG.debug("Sending {} with id {}", value, headerDto); + template.sendBodyAndHeader("seda:start?size=" + SIZE, value, "id", headerDto); + } + + LOG.info("Sending all " + SIZE + " message done. Now waiting for aggregation to complete."); + + assertMockEndpointsSatisfied(); + } + + // ************************************* + // Config + // ************************************* + + @Configuration + public static class TestConfiguration { + + @Bean + public PlatformTransactionManager transactionManager(DataSource dataSource) { + return new DataSourceTransactionManager(dataSource); + } + + @Bean + public DataSource dataSource() { + return initDb("sql/init.sql"); + } + + @Bean + public JdbcAggregationRepository jdbcAggregationRepository(DataSource dataSource, PlatformTransactionManager transactionManager) { + JdbcAggregationRepository repo = new JdbcAggregationRepository(transactionManager, "aggregationRepo1", dataSource); + + repo.setAllowSerializedHeaders(true); + + return repo; + } + + @Bean + public RouteBuilder routeBuilder(JdbcAggregationRepository repo) { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("seda:start?size=" + SIZE) + .to("log:input?groupSize=500") + .aggregate(header("id"), new MyAggregationStrategy()) + .aggregationRepository(repo) + .completionSize(SIZE) + .to("log:output?showHeaders=true") + .to("mock:result") + .end(); + } + }; + } + } +} diff --git a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/aggregation/JdbcAggregateStoreAsTextTest.java b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/aggregation/JdbcAggregateStoreAsTextTest.java new file mode 100644 index 0000000..9b5667a --- /dev/null +++ b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/aggregation/JdbcAggregateStoreAsTextTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sql.aggregation; + +import org.apache.camel.Configuration; +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.sql.BaseSql; +import org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository; +import org.apache.camel.spring.boot.CamelAutoConfiguration; +import org.apache.camel.test.spring.junit5.CamelSpringBootTest; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.transaction.PlatformTransactionManager; + +import javax.sql.DataSource; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + + +@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS) +@CamelSpringBootTest +@SpringBootTest( + classes = { + CamelAutoConfiguration.class, + JdbcAggregateStoreAsTextTest.class, + JdbcAggregateStoreAsTextTest.TestConfiguration.class, + BaseSql.TestConfiguration.class + } +) +public class JdbcAggregateStoreAsTextTest extends BaseSql { + protected JdbcTemplate jdbcTemplate; + + @EndpointInject("mock:result") + protected MockEndpoint resultEndpoint; + + @Autowired + private DataSource ds; + + @BeforeEach + public void setUp() throws Exception { + jdbcTemplate = new JdbcTemplate(ds); + jdbcTemplate.afterPropertiesSet(); + } + + @Test + public void testStoreBodyAsTextAndCompanyNameHeaderAndAccountNameHeader() throws Exception { + resultEndpoint.expectedBodiesReceived("ABCDE"); + + + + Map<String, Object> headers = new HashMap<>(); + headers.put("id", 123); + headers.put("companyName", "Acme"); + headers.put("accountName", "Alan"); + + template.sendBodyAndHeaders("direct:start", "A", headers); + assertEquals("A", getAggregationRepositoryBody(123)); + assertEquals("Acme", getAggregationRepositoryCompanyName(123)); + assertEquals("Alan", getAggregationRepositoryAccountName(123)); + + template.sendBodyAndHeaders("direct:start", "B", headers); + assertEquals("AB", getAggregationRepositoryBody(123)); + assertEquals("Acme", getAggregationRepositoryCompanyName(123)); + assertEquals("Alan", getAggregationRepositoryAccountName(123)); + + template.sendBodyAndHeaders("direct:start", "C", headers); + assertEquals("ABC", getAggregationRepositoryBody(123)); + assertEquals("Acme", getAggregationRepositoryCompanyName(123)); + assertEquals("Alan", getAggregationRepositoryAccountName(123)); + + template.sendBodyAndHeaders("direct:start", "D", headers); + assertEquals("ABCD", getAggregationRepositoryBody(123)); + assertEquals("Acme", getAggregationRepositoryCompanyName(123)); + assertEquals("Alan", getAggregationRepositoryAccountName(123)); + + template.sendBodyAndHeaders("direct:start", "E", headers); + + assertMockEndpointsSatisfied(); + } + + public String getAggregationRepositoryBody(int id) { + return getAggregationRepositoryColumn(id, "body"); + } + + public String getAggregationRepositoryCompanyName(int id) { + return getAggregationRepositoryColumn(id, "companyName"); + } + + public String getAggregationRepositoryAccountName(int id) { + return getAggregationRepositoryColumn(id, "accountName"); + } + + public String getAggregationRepositoryColumn(int id, String columnName) { + return jdbcTemplate.queryForObject("SELECT " + columnName + " from aggregationRepo3 where id = ?", String.class, id); + } + + @Configuration + public static class TestConfiguration { + + @Bean + public PlatformTransactionManager transactionManager(DataSource dataSource) { + return new DataSourceTransactionManager(dataSource); + } + + @Bean + public DataSource dataSource() { + return initDb("sql/init3.sql"); + } + + @Bean + public JdbcAggregationRepository jdbcAggregationRepository(DataSource dataSource, PlatformTransactionManager transactionManager) { + JdbcAggregationRepository repo = new JdbcAggregationRepository(transactionManager, "aggregationRepo3", dataSource); + + repo.setStoreBodyAsText(true); + repo.setHeadersToStoreAsText(Arrays.asList("companyName", "accountName")); + + return repo; + } + + @Bean + public RouteBuilder routeBuilder(JdbcAggregationRepository repo) { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .aggregate(header("id"), new MyAggregationStrategy()) + .aggregationRepository(repo) + .completionSize(5) + .to("log:output?showHeaders=true") + .to("mock:result") + .end(); + } + }; + } + } +} diff --git a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/aggregation/MyAggregationStrategy.java b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/aggregation/MyAggregationStrategy.java new file mode 100644 index 0000000..9740f54 --- /dev/null +++ b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/aggregation/MyAggregationStrategy.java @@ -0,0 +1,19 @@ +package org.apache.camel.component.sql.aggregation; + +import org.apache.camel.AggregationStrategy; +import org.apache.camel.Exchange; + +public class MyAggregationStrategy implements AggregationStrategy { + + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + if (oldExchange == null) { + return newExchange; + } + String body1 = oldExchange.getIn().getBody(String.class); + String body2 = newExchange.getIn().getBody(String.class); + + oldExchange.getIn().setBody(body1 + body2); + return oldExchange; + } +} diff --git a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/idempotent/CustomizedJdbcMessageIdRepositoryTest.java b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/idempotent/CustomizedJdbcMessageIdRepositoryTest.java new file mode 100644 index 0000000..457a80c --- /dev/null +++ b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/idempotent/CustomizedJdbcMessageIdRepositoryTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sql.idempotent; + +import org.apache.camel.Configuration; +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.sql.BaseSql; +import org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository; +import org.apache.camel.spring.boot.CamelAutoConfiguration; +import org.apache.camel.test.spring.junit5.CamelSpringBootTest; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.test.annotation.DirtiesContext; + +import javax.sql.DataSource; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS) +@CamelSpringBootTest +@SpringBootTest( + classes = { + CamelAutoConfiguration.class, + CustomizedJdbcMessageIdRepositoryTest.class, + CustomizedJdbcMessageIdRepositoryTest.TestConfiguration.class, + BaseSql.TestConfiguration.class + } +) +public class CustomizedJdbcMessageIdRepositoryTest extends BaseSql { + + protected static final String SELECT_ALL_STRING + = "SELECT messageId FROM CUSTOMIZED_MESSAGE_REPOSITORY WHERE processorName = ?"; + protected static final String PROCESSOR_NAME = "myProcessorName"; + + protected JdbcTemplate jdbcTemplate; + + @EndpointInject("mock:result") + protected MockEndpoint resultEndpoint; + + @EndpointInject("mock:error") + protected MockEndpoint errorEndpoint; + + @Autowired + private DataSource dataSource; + + @BeforeEach + public void setUp() throws Exception { + jdbcTemplate = new JdbcTemplate(dataSource); + jdbcTemplate.afterPropertiesSet(); + } + + @Test + public void testDuplicateMessagesAreFilteredOut() throws Exception { + resultEndpoint.expectedBodiesReceived("one", "two", "three"); + errorEndpoint.expectedMessageCount(0); + + template.sendBodyAndHeader("direct:start", "one", "messageId", "1"); + template.sendBodyAndHeader("direct:start", "two", "messageId", "2"); + template.sendBodyAndHeader("direct:start", "one", "messageId", "1"); + template.sendBodyAndHeader("direct:start", "two", "messageId", "2"); + template.sendBodyAndHeader("direct:start", "one", "messageId", "1"); + template.sendBodyAndHeader("direct:start", "three", "messageId", "3"); + + assertMockEndpointsSatisfied(); + + // all 3 messages should be in jdbc repo + List<String> receivedMessageIds = jdbcTemplate.queryForList(SELECT_ALL_STRING, String.class, PROCESSOR_NAME); + + assertEquals(3, receivedMessageIds.size()); + assertTrue(receivedMessageIds.contains("1")); + assertTrue(receivedMessageIds.contains("2")); + assertTrue(receivedMessageIds.contains("3")); + } + + // ************************************* + // Config + // ************************************* + + @Configuration + public static class TestConfiguration { + + @Bean + public DataSource dataSource() { + return initEmptyDb(); + } + + + @Bean + public RouteBuilder routeBuilder(DataSource dataSource) { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + deadLetterChannel("mock:error"); + + JdbcMessageIdRepository repo = new JdbcMessageIdRepository(dataSource, PROCESSOR_NAME); + repo.setTableExistsString("SELECT 1 FROM CUSTOMIZED_MESSAGE_REPOSITORY WHERE 1 = 0"); + repo.setCreateString("CREATE TABLE CUSTOMIZED_MESSAGE_REPOSITORY (processorName VARCHAR(255), messageId VARCHAR(100), createdAt TIMESTAMP)"); + repo.setQueryString("SELECT COUNT(*) FROM CUSTOMIZED_MESSAGE_REPOSITORY WHERE processorName = ? AND messageId = ?"); + repo.setInsertString("INSERT INTO CUSTOMIZED_MESSAGE_REPOSITORY (processorName, messageId, createdAt) VALUES (?, ?, ?)"); + repo.setDeleteString("DELETE FROM CUSTOMIZED_MESSAGE_REPOSITORY WHERE processorName = ? AND messageId = ?"); + + from("direct:start") + .idempotentConsumer(header("messageId"), repo) + .to("mock:result"); + } + }; + } + } +} diff --git a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/idempotent/JdbcCachedMessageIdRepositoryTest.java b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/idempotent/JdbcCachedMessageIdRepositoryTest.java new file mode 100644 index 0000000..48638d4 --- /dev/null +++ b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/idempotent/JdbcCachedMessageIdRepositoryTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sql.idempotent; + +import org.apache.camel.CamelContext; +import org.apache.camel.Configuration; +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.sql.BaseSql; +import org.apache.camel.processor.idempotent.jdbc.JdbcCachedMessageIdRepository; +import org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository; +import org.apache.camel.spring.boot.CamelAutoConfiguration; +import org.apache.camel.test.spring.junit5.CamelSpringBootTest; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.test.annotation.DirtiesContext; + +import javax.sql.DataSource; +import java.sql.Timestamp; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS) +@CamelSpringBootTest +@SpringBootTest( + classes = { + CamelAutoConfiguration.class, + JdbcCachedMessageIdRepositoryTest.class, + JdbcCachedMessageIdRepositoryTest.TestConfiguration.class, + BaseSql.TestConfiguration.class + } +) +public class JdbcCachedMessageIdRepositoryTest extends BaseSql { + + private static final String INSERT_STRING + = "INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId, createdAt) VALUES (?, ?, ?)"; + private static final String PROCESSOR_NAME = "myProcessorName"; + + private JdbcTemplate jdbcTemplate; + private DataSource dataSource; + private JdbcCachedMessageIdRepository repository; + + @EndpointInject("mock:result") + private MockEndpoint resultEndpoint; + + @EndpointInject("mock:error") + private MockEndpoint errorEndpoint; + + @Autowired + private DataSource ds; + + @BeforeEach + public void setUp() throws Exception { + jdbcTemplate = new JdbcTemplate(ds); + jdbcTemplate.afterPropertiesSet(); + jdbcTemplate.update(INSERT_STRING, PROCESSOR_NAME, "1", new Timestamp(System.currentTimeMillis())); + jdbcTemplate.update(INSERT_STRING, PROCESSOR_NAME, "2", new Timestamp(System.currentTimeMillis())); + repository = context.getRegistry().lookupByNameAndType(PROCESSOR_NAME, JdbcCachedMessageIdRepository.class); + repository.reload(); + } + + @Test + public void testCacheHit() throws Exception { + resultEndpoint.expectedBodiesReceived("three"); + errorEndpoint.expectedMessageCount(0); + + template.sendBodyAndHeader("direct:start", "one", "messageId", "1"); + template.sendBodyAndHeader("direct:start", "two", "messageId", "2"); + template.sendBodyAndHeader("direct:start", "three", "messageId", "3"); + template.sendBodyAndHeader("direct:start", "one", "messageId", "1"); + template.sendBodyAndHeader("direct:start", "two", "messageId", "2"); + template.sendBodyAndHeader("direct:start", "three", "messageId", "3"); + + assertMockEndpointsSatisfied(); + + assertEquals(5, repository.getHitCount()); + assertEquals(1, repository.getMissCount()); + } + + // ************************************* + // Config + // ************************************* + + @Configuration + public static class TestConfiguration { + + @Bean + public DataSource dataSource() { + return initEmptyDb(); + } + + @Bean + public RouteBuilder routeBuilder(DataSource dataSource, CamelContext context) { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + deadLetterChannel("mock:error"); + + JdbcMessageIdRepository repo = new JdbcCachedMessageIdRepository(dataSource, PROCESSOR_NAME); + context.getRegistry().bind(PROCESSOR_NAME, repo); + from("direct:start") + .idempotentConsumer(header("messageId"), repo) + .to("mock:result"); + } + }; + } + } +} diff --git a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/idempotent/JdbcMessageIdRepositoryTest.java b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/idempotent/JdbcMessageIdRepositoryTest.java new file mode 100644 index 0000000..643a231 --- /dev/null +++ b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/idempotent/JdbcMessageIdRepositoryTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sql.idempotent; + +import org.apache.camel.Configuration; +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.sql.BaseSql; +import org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository; +import org.apache.camel.spring.boot.CamelAutoConfiguration; +import org.apache.camel.test.spring.junit5.CamelSpringBootTest; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.test.annotation.DirtiesContext; + +import javax.sql.DataSource; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS) +@CamelSpringBootTest +@SpringBootTest( + classes = { + CamelAutoConfiguration.class, + JdbcMessageIdRepositoryTest.class, + JdbcMessageIdRepositoryTest.TestConfiguration.class, + BaseSql.TestConfiguration.class + } +) +public class JdbcMessageIdRepositoryTest extends BaseSql { + + protected static final String SELECT_ALL_STRING = "SELECT messageId FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ?"; + protected static final String CLEAR_STRING = "DELETE FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ?"; + protected static final String PROCESSOR_NAME = "myProcessorName"; + + private JdbcTemplate jdbcTemplate; + + @EndpointInject("mock:result") + private MockEndpoint resultEndpoint; + + @EndpointInject("mock:error") + private MockEndpoint errorEndpoint; + + @Autowired + private DataSource ds; + + @BeforeEach + public void setUp() throws Exception { + jdbcTemplate = new JdbcTemplate(ds); + jdbcTemplate.afterPropertiesSet(); + } + + @Test + public void testDuplicateMessagesAreFilteredOut() throws Exception { + resultEndpoint.expectedBodiesReceived("one", "two", "three"); + errorEndpoint.expectedMessageCount(0); + + template.sendBodyAndHeader("direct:start", "one", "messageId", "1"); + template.sendBodyAndHeader("direct:start", "two", "messageId", "2"); + template.sendBodyAndHeader("direct:start", "one", "messageId", "1"); + template.sendBodyAndHeader("direct:start", "two", "messageId", "2"); + template.sendBodyAndHeader("direct:start", "one", "messageId", "1"); + template.sendBodyAndHeader("direct:start", "three", "messageId", "3"); + + assertMockEndpointsSatisfied(); + + // all 3 messages should be in jdbc repo + List<String> receivedMessageIds = jdbcTemplate.queryForList(SELECT_ALL_STRING, String.class, PROCESSOR_NAME); + + assertEquals(3, receivedMessageIds.size()); + assertTrue(receivedMessageIds.contains("1")); + assertTrue(receivedMessageIds.contains("2")); + assertTrue(receivedMessageIds.contains("3")); + } + + + // ************************************* + // Config + // ************************************* + + @Configuration + public static class TestConfiguration { + + @Bean + public DataSource dataSource() { + return initEmptyDb(); + } + + @Bean + public RouteBuilder routeBuilder(DataSource dataSource) { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + deadLetterChannel("mock:error"); + + JdbcMessageIdRepository repo = new JdbcMessageIdRepository(dataSource, PROCESSOR_NAME); + from("direct:start") + .idempotentConsumer(header("messageId"), repo) + .to("mock:result"); + } + }; + } + } +} diff --git a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/support/DummyJDBCDriver.java b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/support/DummyJDBCDriver.java deleted file mode 100644 index 0982bc5..0000000 --- a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/component/sql/support/DummyJDBCDriver.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.sql.support; - -import java.sql.Connection; -import java.sql.Driver; -import java.sql.DriverPropertyInfo; -import java.sql.SQLException; -import java.sql.SQLFeatureNotSupportedException; -import java.util.Properties; -import java.util.logging.Logger; - -/** - * Dummy JDBCDriver to be used in tests. - */ -public class DummyJDBCDriver implements Driver { - - @Override - public Connection connect(String s, Properties properties) throws SQLException { - return null; - } - - @Override - public boolean acceptsURL(String s) throws SQLException { - return false; - } - - @Override - public DriverPropertyInfo[] getPropertyInfo(String s, Properties properties) throws SQLException { - return new DriverPropertyInfo[0]; - } - - @Override - public int getMajorVersion() { - return 0; - } - - @Override - public int getMinorVersion() { - return 0; - } - - @Override - public boolean jdbcCompliant() { - return false; - } - - @Override - public Logger getParentLogger() throws SQLFeatureNotSupportedException { - return null; - } -} diff --git a/components-starter/camel-sql-starter/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepositoryTest.java b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepositoryTest.java new file mode 100644 index 0000000..710bf11 --- /dev/null +++ b/components-starter/camel-sql-starter/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepositoryTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.idempotent.jdbc; + +import org.apache.camel.Configuration; +import org.apache.camel.component.sql.BaseSql; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository; +import org.apache.camel.processor.idempotent.jdbc.JdbcOrphanLockAwareIdempotentRepository; +import org.apache.camel.processor.idempotent.jdbc.JdbcOrphanLockAwareIdempotentRepository.ProcessorNameAndMessageId; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestInstance.Lifecycle; +import org.springframework.context.annotation.Bean; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; + +import java.sql.Timestamp; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@TestInstance(Lifecycle.PER_CLASS) +public class JdbcOrphanLockAwareIdempotentRepositoryTest extends BaseSql { + + private static final String APP_NAME = "APP_1"; + + private EmbeddedDatabase dataSource; + + private JdbcOrphanLockAwareIdempotentRepository jdbcMessageIdRepository; + + @BeforeAll + public void setup() throws Exception { + dataSource = initDb(EmbeddedDatabaseType.HSQL, "sql/idempotentWithOrphanLockRemoval.sql"); + jdbcMessageIdRepository = new JdbcOrphanLockAwareIdempotentRepository(dataSource, APP_NAME, new DefaultCamelContext()); + jdbcMessageIdRepository.setLockMaxAgeMillis(3000_00L); + jdbcMessageIdRepository.setLockKeepAliveIntervalMillis(3000L); + jdbcMessageIdRepository.doInit(); + } + + @Test + public void testLockNotGrantedForCurrentTimeStamp() { + assertTrue(jdbcMessageIdRepository.contains("FILE_1")); + } + + @Test + public void testLockNotGrantedForCurrentTimeStampPlus2Min() { + assertTrue(jdbcMessageIdRepository.contains("FILE_2")); + } + + @Test + public void testLockGrantedForCurrentTimeStampPlus5Min() { + assertFalse(jdbcMessageIdRepository.contains("FILE_3")); + } + + @Test + public void testLockKeepAliveWorks() { + assertFalse(jdbcMessageIdRepository.contains("FILE_4")); + jdbcMessageIdRepository.insert("FILE_4"); + assertTrue(jdbcMessageIdRepository.contains("FILE_4")); + JdbcTemplate template = new JdbcTemplate(dataSource); + Timestamp timestamp = new Timestamp(System.currentTimeMillis() - 5 * 60 * 1000L); + template.update("UPDATE CAMEL_MESSAGEPROCESSED SET createdAT = ? WHERE processorName = ? AND messageId = ?", timestamp, + APP_NAME, "FILE_4"); + + await().atMost(5, TimeUnit.SECONDS).until(() -> !jdbcMessageIdRepository.contains("FILE_4")); + jdbcMessageIdRepository.keepAlive(); + assertTrue(jdbcMessageIdRepository.contains("FILE_4")); + } + + @Test + public void testInsertQueryDelete() { + assertFalse(jdbcMessageIdRepository.contains("FILE_5")); + assertFalse(jdbcMessageIdRepository.getProcessorNameMessageIdSet() + .contains(new ProcessorNameAndMessageId(APP_NAME, "FILE_5"))); + + jdbcMessageIdRepository.add("FILE_5"); + + assertTrue(jdbcMessageIdRepository.getProcessorNameMessageIdSet() + .contains(new ProcessorNameAndMessageId(APP_NAME, "FILE_5"))); + assertTrue(jdbcMessageIdRepository.contains("FILE_5")); + jdbcMessageIdRepository.remove("FILE_5"); + assertFalse(jdbcMessageIdRepository.contains("FILE_5")); + assertFalse(jdbcMessageIdRepository.getProcessorNameMessageIdSet() + .contains(new ProcessorNameAndMessageId(APP_NAME, "FILE_5"))); + } +} diff --git a/components-starter/camel-sql-starter/src/test/resources/sql/createAndPopulateDatabase.sql b/components-starter/camel-sql-starter/src/test/resources/sql/createAndPopulateDatabase.sql new file mode 100644 index 0000000..cc9e4b2 --- /dev/null +++ b/components-starter/camel-sql-starter/src/test/resources/sql/createAndPopulateDatabase.sql @@ -0,0 +1,23 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +-- START SNIPPET: e1 +create table projects (id integer primary key, project varchar(10), license varchar(5)); +insert into projects values (1, 'Camel', 'ASF'); +insert into projects values (2, 'AMQ', 'ASF'); +insert into projects values (3, 'Linux', 'XXX'); +-- END SNIPPET: e1 \ No newline at end of file diff --git a/components-starter/camel-sql-starter/src/test/resources/sql/createAndPopulateDatabase3.sql b/components-starter/camel-sql-starter/src/test/resources/sql/createAndPopulateDatabase3.sql new file mode 100644 index 0000000..2499f1c --- /dev/null +++ b/components-starter/camel-sql-starter/src/test/resources/sql/createAndPopulateDatabase3.sql @@ -0,0 +1,27 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +-- START SNIPPET: e1 +create table projects (id integer primary key GENERATED ALWAYS AS IDENTITY, project varchar(10), license varchar(5), description varchar(1000) default null); +insert into projects (project, license, description) values ('Camel', 'ASF', ''); +insert into projects (project, license, description) values ('AMQ', 'ASF', ''); +insert into projects (project, license, description) values ('Linux', 'XXX', ''); +-- END SNIPPET: e1 + +-- START SNIPPET: e2 +create table developers (id1 integer primary key GENERATED ALWAYS AS IDENTITY (START WITH 5), name varchar(20), position varchar(20), id2 integer GENERATED ALWAYS AS (id1+1)); +-- END SNIPPET: e2 \ No newline at end of file diff --git a/components-starter/camel-sql-starter/src/test/resources/sql/idempotentWithOrphanLockRemoval.sql b/components-starter/camel-sql-starter/src/test/resources/sql/idempotentWithOrphanLockRemoval.sql new file mode 100644 index 0000000..ce108eb --- /dev/null +++ b/components-starter/camel-sql-starter/src/test/resources/sql/idempotentWithOrphanLockRemoval.sql @@ -0,0 +1,14 @@ +-- Add DDL to create tables, views, indexes, etc needed by tests. These should match the expected database structure as it will appear in production. +SET DATABASE SQL SYNTAX PGS TRUE; -- tells HSQLDB that this schema uses MYSQL syntax +SET PROPERTY "sql.enforce_strict_size" FALSE; + +CREATE TABLE CAMEL_MESSAGEPROCESSED (processorName VARCHAR(255), messageId VARCHAR(100), createdAt TIMESTAMP); + +ALTER TABLE CAMEL_MESSAGEPROCESSED ADD PRIMARY KEY (processorName, messageId); + + +INSERT INTO CAMEL_MESSAGEPROCESSED VALUES ('APP_1', 'FILE_1', CURRENT_TIMESTAMP); + +INSERT INTO CAMEL_MESSAGEPROCESSED VALUES ('APP_1', 'FILE_2',TIMESTAMPADD(SQL_TSI_MINUTE, -2, CURRENT_TIMESTAMP)); + +INSERT INTO CAMEL_MESSAGEPROCESSED VALUES ('APP_1', 'FILE_3',TIMESTAMPADD(SQL_TSI_MINUTE, -5, CURRENT_TIMESTAMP)); diff --git a/components-starter/camel-sql-starter/src/test/resources/sql/init.sql b/components-starter/camel-sql-starter/src/test/resources/sql/init.sql new file mode 100644 index 0000000..6e1b3f0 --- /dev/null +++ b/components-starter/camel-sql-starter/src/test/resources/sql/init.sql @@ -0,0 +1,29 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +CREATE TABLE aggregationRepo1 ( + id varchar(255) NOT NULL, + exchange blob NOT NULL, + version bigint NOT NULL, + constraint aggregationRepo1_pk PRIMARY KEY (id) +); +CREATE TABLE aggregationRepo1_completed ( + id varchar(255) NOT NULL, + exchange blob NOT NULL, + version bigint NOT NULL, + constraint aggregationRepo1_completed_pk PRIMARY KEY (id) +); \ No newline at end of file diff --git a/components-starter/camel-sql-starter/src/test/resources/sql/init3.sql b/components-starter/camel-sql-starter/src/test/resources/sql/init3.sql new file mode 100644 index 0000000..baad28a --- /dev/null +++ b/components-starter/camel-sql-starter/src/test/resources/sql/init3.sql @@ -0,0 +1,35 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +CREATE TABLE aggregationRepo3 ( + id varchar(255) NOT NULL, + exchange blob NOT NULL, + version bigint NOT NULL, + body varchar(1000), + companyName varchar(1000), + accountName varchar(1000), + constraint aggregationRepo3_pk PRIMARY KEY (id) +); +CREATE TABLE aggregationRepo3_completed ( + id varchar(255) NOT NULL, + exchange blob NOT NULL, + version bigint NOT NULL, + body varchar(1000), + companyName varchar(1000), + accountName varchar(1000), + constraint aggregationRepo3_completed_pk PRIMARY KEY (id) +); \ No newline at end of file diff --git a/components-starter/camel-sql-starter/src/test/resources/sql/selectProjectsIn.sql b/components-starter/camel-sql-starter/src/test/resources/sql/selectProjectsIn.sql new file mode 100644 index 0000000..7f5f590 --- /dev/null +++ b/components-starter/camel-sql-starter/src/test/resources/sql/selectProjectsIn.sql @@ -0,0 +1,22 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +-- this is a comment +select * +from projects +where project in (:#in:names) +order by id \ No newline at end of file