Updated Branches: refs/heads/camel-2.12.x ab25eccc4 -> b024af4d4
CAMEL-6782: Added test demonstrating the failover of Camel routing inside a clustered quartz setup. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b024af4d Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b024af4d Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b024af4d Branch: refs/heads/camel-2.12.x Commit: b024af4d4392bce9e2e06bb2c398f8559a8803f9 Parents: ab25ecc Author: Babak Vahdat <bvah...@apache.org> Authored: Wed Sep 25 00:29:24 2013 +0200 Committer: Babak Vahdat <bvah...@apache.org> Committed: Wed Sep 25 00:31:12 2013 +0200 ---------------------------------------------------------------------- .../quartz/ScheduledRoutePolicy.java | 6 +- ...ngQuartzPersistentStoreClusteredAppTest.java | 85 ------------------ ...pringQuartzTwoAppsClusteredFailoverTest.java | 94 ++++++++++++++++++++ .../quartz/SpringQuartzClusteredAppOneTest.xml | 26 +++--- .../quartz/SpringQuartzClusteredAppTwoTest.xml | 28 +++--- .../quartz/SpringQuartzEmbeddedDatabase.xml | 31 +++++++ 6 files changed, 155 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/b024af4d/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRoutePolicy.java ---------------------------------------------------------------------- diff --git a/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRoutePolicy.java b/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRoutePolicy.java index 013bbb9..a22e397 100644 --- a/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRoutePolicy.java +++ b/components/camel-quartz/src/main/java/org/apache/camel/routepolicy/quartz/ScheduledRoutePolicy.java @@ -90,11 +90,11 @@ public abstract class ScheduledRoutePolicy extends RoutePolicySupport implements JobDetail existingJobDetail = getScheduler().getJobDetail(jobDetail.getName(), jobDetail.getGroup()); if (jobDetail.equals(existingJobDetail)) { if (LOG.isInfoEnabled()) { - LOG.info("Skipping to schedule the job: {} for action: {} on route {} as the job: {} already existing!", + LOG.info("Skipping to schedule the job: {} for action: {} on route {} as the job: {} already existing inside the cluster", new Object[] {jobDetail.getFullName(), action, route.getId(), existingJobDetail.getFullName()}); } - // skip scheduling the same job as one is already available for the same route and action + // skip scheduling the same job again as one is already existing for the same routeId and action return; } } @@ -150,7 +150,7 @@ public abstract class ScheduledRoutePolicy extends RoutePolicySupport implements getScheduler().deleteJob(jobDetailName, jobDetailGroup); } - LOG.debug("Scheduled Job: {}.{} is deleted", jobDetailGroup, jobDetailName); + LOG.debug("Scheduled job: {}.{} is deleted", jobDetailGroup, jobDetailName); } protected JobDetail createJobDetail(Action action, Route route) throws Exception { http://git-wip-us.apache.org/repos/asf/camel/blob/b024af4d/components/camel-quartz/src/test/java/org/apache/camel/routepolicy/quartz/SpringQuartzPersistentStoreClusteredAppTest.java ---------------------------------------------------------------------- diff --git a/components/camel-quartz/src/test/java/org/apache/camel/routepolicy/quartz/SpringQuartzPersistentStoreClusteredAppTest.java b/components/camel-quartz/src/test/java/org/apache/camel/routepolicy/quartz/SpringQuartzPersistentStoreClusteredAppTest.java deleted file mode 100644 index 26551cd..0000000 --- a/components/camel-quartz/src/test/java/org/apache/camel/routepolicy/quartz/SpringQuartzPersistentStoreClusteredAppTest.java +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.routepolicy.quartz; - -import org.apache.camel.CamelContext; -import org.apache.camel.CamelExecutionException; -import org.apache.camel.ProducerTemplate; -import org.apache.camel.component.direct.DirectConsumerNotAvailableException; -import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.test.junit4.TestSupport; - -import org.junit.Test; - -import org.springframework.context.support.AbstractXmlApplicationContext; -import org.springframework.context.support.ClassPathXmlApplicationContext; - -/** - * @version - */ -public class SpringQuartzPersistentStoreClusteredAppTest extends TestSupport { - - @Test - public void testQuartzPersistentStoreClusteredApp() throws Exception { - // boot up the first clustered app which also launches an embedded database - AbstractXmlApplicationContext app = new ClassPathXmlApplicationContext("org/apache/camel/routepolicy/quartz/SpringQuartzClusteredAppOneTest.xml"); - app.start(); - - // and now the second one - AbstractXmlApplicationContext app2 = new ClassPathXmlApplicationContext("org/apache/camel/routepolicy/quartz/SpringQuartzClusteredAppTwoTest.xml"); - app2.start(); - - CamelContext camel = app.getBean("camelContext", CamelContext.class); - assertNotNull(camel); - - MockEndpoint mock = camel.getEndpoint("mock:result", MockEndpoint.class); - mock.expectedMessageCount(1); - mock.expectedBodiesReceived("clustering pings!"); - - // wait a bit to make sure the route has been properly started through - // the given route policy - Thread.sleep(5000); - - app.getBean("template", ProducerTemplate.class).sendBody("direct:start", "clustering"); - - mock.assertIsSatisfied(); - - CamelContext camel2 = app2.getBean("camelContext", CamelContext.class); - assertNotNull(camel2); - - MockEndpoint mock2 = camel2.getEndpoint("mock:result", MockEndpoint.class); - mock2.expectedMessageCount(0); - - // expect no consumer being started as the seconds app is expected to - // run in standby modus - try { - app2.getBean("template", ProducerTemplate.class).sendBody("direct:start", "clustering"); - fail("Should have thrown exception"); - } catch (CamelExecutionException cee) { - assertIsInstanceOf(DirectConsumerNotAvailableException.class, cee.getCause()); - } - - mock2.assertIsSatisfied(); - - // we're done so let's properly close the application contexts, but stop - // the second app before the first one so that the quartz scheduler running - // inside it can properly be shutdown - app2.close(); - app.close(); - } - -} http://git-wip-us.apache.org/repos/asf/camel/blob/b024af4d/components/camel-quartz/src/test/java/org/apache/camel/routepolicy/quartz/SpringQuartzTwoAppsClusteredFailoverTest.java ---------------------------------------------------------------------- diff --git a/components/camel-quartz/src/test/java/org/apache/camel/routepolicy/quartz/SpringQuartzTwoAppsClusteredFailoverTest.java b/components/camel-quartz/src/test/java/org/apache/camel/routepolicy/quartz/SpringQuartzTwoAppsClusteredFailoverTest.java new file mode 100644 index 0000000..84b67f6 --- /dev/null +++ b/components/camel-quartz/src/test/java/org/apache/camel/routepolicy/quartz/SpringQuartzTwoAppsClusteredFailoverTest.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.routepolicy.quartz; + +import org.apache.camel.CamelContext; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.TestSupport; +import org.junit.Test; +import org.springframework.context.support.AbstractXmlApplicationContext; +import org.springframework.context.support.ClassPathXmlApplicationContext; + +/** + * @version + */ +public class SpringQuartzTwoAppsClusteredFailoverTest extends TestSupport { + + @Test + public void testQuartzPersistentStoreClusteredApp() throws Exception { + // boot up the database the two apps are going to share inside a clustered quartz setup + AbstractXmlApplicationContext db = new ClassPathXmlApplicationContext("org/apache/camel/routepolicy/quartz/SpringQuartzEmbeddedDatabase.xml"); + db.start(); + + // now launch the first clustered app + AbstractXmlApplicationContext app = new ClassPathXmlApplicationContext("org/apache/camel/routepolicy/quartz/SpringQuartzClusteredAppOneTest.xml"); + app.start(); + + // as well as the second one + AbstractXmlApplicationContext app2 = new ClassPathXmlApplicationContext("org/apache/camel/routepolicy/quartz/SpringQuartzClusteredAppTwoTest.xml"); + app2.start(); + + CamelContext camel = app.getBean("camelContext", CamelContext.class); + + MockEndpoint mock = camel.getEndpoint("mock:result", MockEndpoint.class); + mock.expectedMessageCount(1); + mock.expectedBodiesReceived("clustering PINGS!"); + + // wait a bit to make sure the route has already been properly started through the given route policy + Thread.sleep(5000); + + app.getBean("template", ProducerTemplate.class).sendBody("direct:start", "clustering"); + + mock.assertIsSatisfied(); + + // now let's simulate a crash of the first app + log.warn("The first app is going to crash NOW!"); + app.close(); + + log.warn("Crashed..."); + log.warn("Crashed..."); + log.warn("Crashed..."); + + // wait long enough until the second app takes it over... + Thread.sleep(20000); + // inside the logs one can then clearly see how the route of the second CamelContext gets started: + // 2013-09-24 22:51:34,215 [main ] WARN ersistentStoreClusteredAppTest - Crashed... + // 2013-09-24 22:51:34,215 [main ] WARN ersistentStoreClusteredAppTest - Crashed... + // 2013-09-24 22:51:34,215 [main ] WARN ersistentStoreClusteredAppTest - Crashed... + // 2013-09-24 22:51:49,188 [_ClusterManager] INFO LocalDataSourceJobStore - ClusterManager: detected 1 failed or restarted instances. + // 2013-09-24 22:51:49,188 [_ClusterManager] INFO LocalDataSourceJobStore - ClusterManager: Scanning for instance "app-one"'s failed in-progress jobs. + // 2013-09-24 22:51:49,211 [eduler_Worker-1] INFO SpringCamelContext - Route: myRoute started and consuming from: Endpoint[direct://start] + + CamelContext camel2 = app2.getBean("camelContext2", CamelContext.class); + + MockEndpoint mock2 = camel2.getEndpoint("mock:result", MockEndpoint.class); + mock2.expectedMessageCount(1); + mock2.expectedBodiesReceived("clustering PONGS!"); + + app2.getBean("template", ProducerTemplate.class).sendBody("direct:start", "clustering"); + + mock2.assertIsSatisfied(); + + // stop the second app as we're already done + app2.close(); + + // and as the last step stop the database itself... + db.close(); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/b024af4d/components/camel-quartz/src/test/resources/org/apache/camel/routepolicy/quartz/SpringQuartzClusteredAppOneTest.xml ---------------------------------------------------------------------- diff --git a/components/camel-quartz/src/test/resources/org/apache/camel/routepolicy/quartz/SpringQuartzClusteredAppOneTest.xml b/components/camel-quartz/src/test/resources/org/apache/camel/routepolicy/quartz/SpringQuartzClusteredAppOneTest.xml index d9584dc..75f7c05 100644 --- a/components/camel-quartz/src/test/resources/org/apache/camel/routepolicy/quartz/SpringQuartzClusteredAppOneTest.xml +++ b/components/camel-quartz/src/test/resources/org/apache/camel/routepolicy/quartz/SpringQuartzClusteredAppOneTest.xml @@ -21,23 +21,23 @@ xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd - http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd - "> + http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> - <!-- the persistent store for quartz --> - <jdbc:embedded-database id="camel_quartz" type="DERBY"> - <jdbc:script location="classpath:tables_derby.sql"/> - </jdbc:embedded-database> + <bean id="quartzDataSource" class="org.apache.commons.dbcp.BasicDataSource"> + <property name="driverClassName" value="org.apache.derby.jdbc.EmbeddedDriver" /> + <!-- refer the embedded database we setup inside SpringQuartzEmbeddedDatabase.xml --> + <property name="url" value="jdbc:derby:memory:quartz-db" /> + <property name="username" value="sa" /> + <property name="password" value="" /> + </bean> <bean id="quartz" class="org.apache.camel.component.quartz.QuartzComponent"> <property name="scheduler" ref="scheduler"/> </bean> - <!-- jdbc:embedded-database must come before this so the job store exists --> <bean id="scheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean"> - <property name="dataSource" ref="camel_quartz"/> + <property name="dataSource" ref="quartzDataSource"/> <property name="autoStartup" value="false"/> - <!-- let Camel start --> <property name="schedulerContextAsMap"> <!-- hook Camel into Quartz --> <map> @@ -47,11 +47,11 @@ <property name="quartzProperties"> <props> <prop key="org.quartz.scheduler.instanceName">myscheduler</prop> - <prop key="org.quartz.scheduler.instanceId">AUTO</prop> + <prop key="org.quartz.scheduler.instanceId">app-one</prop> <prop key="org.quartz.scheduler.skipUpdateCheck">true</prop> <prop key="org.quartz.jobStore.driverDelegateClass">org.quartz.impl.jdbcjobstore.StdJDBCDelegate</prop> <prop key="org.quartz.jobStore.isClustered">true</prop> - <prop key="org.quartz.jobStore.clusterCheckinInterval">1000</prop> + <prop key="org.quartz.jobStore.clusterCheckinInterval">5000</prop> </props> </property> </bean> @@ -60,13 +60,13 @@ <property name="routeStartTime" value="0/3 * * * * ?" /> </bean> - <camelContext id="camelContext" managementNamePattern="#name#-#counter#" xmlns="http://camel.apache.org/schema/spring"> + <camelContext id="camelContext" managementNamePattern="#name#" xmlns="http://camel.apache.org/schema/spring"> <template id="template" /> <route id="myRoute" routePolicyRef="startPolicy" autoStartup="false"> <from uri="direct:start" /> <to uri="log:triggered" /> <transform> - <simple>${in.body} pings!</simple> + <simple>${body} PINGS!</simple> </transform> <to uri="mock:result" /> </route> http://git-wip-us.apache.org/repos/asf/camel/blob/b024af4d/components/camel-quartz/src/test/resources/org/apache/camel/routepolicy/quartz/SpringQuartzClusteredAppTwoTest.xml ---------------------------------------------------------------------- diff --git a/components/camel-quartz/src/test/resources/org/apache/camel/routepolicy/quartz/SpringQuartzClusteredAppTwoTest.xml b/components/camel-quartz/src/test/resources/org/apache/camel/routepolicy/quartz/SpringQuartzClusteredAppTwoTest.xml index b09dd44..6126464 100644 --- a/components/camel-quartz/src/test/resources/org/apache/camel/routepolicy/quartz/SpringQuartzClusteredAppTwoTest.xml +++ b/components/camel-quartz/src/test/resources/org/apache/camel/routepolicy/quartz/SpringQuartzClusteredAppTwoTest.xml @@ -21,37 +21,37 @@ xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd - http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd - "> + http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> - <!-- the persistent store for quartz --> - <jdbc:embedded-database id="camel_quartz" type="DERBY"> - <!-- do not load script as database alreaady exists --> - </jdbc:embedded-database> + <bean id="quartzDataSource" class="org.apache.commons.dbcp.BasicDataSource"> + <property name="driverClassName" value="org.apache.derby.jdbc.EmbeddedDriver" /> + <!-- refer the embedded database we setup inside SpringQuartzEmbeddedDatabase.xml --> + <property name="url" value="jdbc:derby:memory:quartz-db" /> + <property name="username" value="sa" /> + <property name="password" value="" /> + </bean> <bean id="quartz" class="org.apache.camel.component.quartz.QuartzComponent"> <property name="scheduler" ref="scheduler"/> </bean> - <!-- jdbc:embedded-database must come before this so the job store exists --> <bean id="scheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean"> - <property name="dataSource" ref="camel_quartz"/> + <property name="dataSource" ref="quartzDataSource"/> <property name="autoStartup" value="false"/> - <!-- let Camel start --> <property name="schedulerContextAsMap"> <!-- hook Camel into Quartz --> <map> - <entry key="CamelQuartzCamelContext" value-ref="camelContext"/> + <entry key="CamelQuartzCamelContext" value-ref="camelContext2"/> </map> </property> <property name="quartzProperties"> <props> <prop key="org.quartz.scheduler.instanceName">myscheduler</prop> - <prop key="org.quartz.scheduler.instanceId">AUTO</prop> + <prop key="org.quartz.scheduler.instanceId">app-two</prop> <prop key="org.quartz.scheduler.skipUpdateCheck">true</prop> <prop key="org.quartz.jobStore.driverDelegateClass">org.quartz.impl.jdbcjobstore.StdJDBCDelegate</prop> <prop key="org.quartz.jobStore.isClustered">true</prop> - <prop key="org.quartz.jobStore.clusterCheckinInterval">1000</prop> + <prop key="org.quartz.jobStore.clusterCheckinInterval">5000</prop> </props> </property> </bean> @@ -60,13 +60,13 @@ <property name="routeStartTime" value="0/3 * * * * ?" /> </bean> - <camelContext id="camelContext" managementNamePattern="#name#-#counter#" xmlns="http://camel.apache.org/schema/spring"> + <camelContext id="camelContext2" managementNamePattern="#name#" xmlns="http://camel.apache.org/schema/spring"> <template id="template" /> <route id="myRoute" routePolicyRef="startPolicy" autoStartup="false"> <from uri="direct:start" /> <to uri="log:triggered" /> <transform> - <simple>${in.body} pongs!</simple> + <simple>${body} PONGS!</simple> </transform> <to uri="mock:result" /> </route> http://git-wip-us.apache.org/repos/asf/camel/blob/b024af4d/components/camel-quartz/src/test/resources/org/apache/camel/routepolicy/quartz/SpringQuartzEmbeddedDatabase.xml ---------------------------------------------------------------------- diff --git a/components/camel-quartz/src/test/resources/org/apache/camel/routepolicy/quartz/SpringQuartzEmbeddedDatabase.xml b/components/camel-quartz/src/test/resources/org/apache/camel/routepolicy/quartz/SpringQuartzEmbeddedDatabase.xml new file mode 100644 index 0000000..83ae9c6 --- /dev/null +++ b/components/camel-quartz/src/test/resources/org/apache/camel/routepolicy/quartz/SpringQuartzEmbeddedDatabase.xml @@ -0,0 +1,31 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns:jdbc="http://www.springframework.org/schema/jdbc" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd + http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd + http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> + + <!-- the embedded persistent storage for quartz --> + <jdbc:embedded-database id="quartz-db" type="DERBY"> + <jdbc:script location="classpath:tables_derby.sql"/> + </jdbc:embedded-database> + +</beans>