Hi Selva, Thanks for sharing your patch with us. Your patch looks good to me, and I managed to reproduce the error by doing some change in the SpringQuartzConsumerTwoAppsClusteredRecoveryTest. I will commit the patch shortly.
Regards, -- Willem Jiang Red Hat, Inc. Web: http://www.redhat.com Blog: http://willemjiang.blogspot.com (English) http://jnn.iteye.com (Chinese) Twitter: willemjiang Weibo: 姜宁willem On December 2, 2014 at 7:23:03 PM, selva (sgpsel...@gmail.com) wrote: > Hi Willem, > > I have downloaded(2.14.x from github) and debugged the camel quartz > component source code and found the reason for the issue i am facing. > > As i already posted i am facing issue in the cluster environment, deployed > the camel application as stand alone program running 2 instances. While > executing a route if one instance goes down the other instance should > trigger the same route immediately as we have configured > *recoverableJob=true* in the quartz2 endpoint . > > The issue i was facing like the second instance is triggering quartz2 > endpoint but not executing my process(QueryBuilderProcessor) . > > Example : > > > > uri="quartz2://cluster/quartz?cron=0+0/4+++*+?&durableJob=true&stateful=true&recoverableJob=true"> > > > > > > > While debugging i found that in camelJob.java, the triggerkey in the normal > and the triggerykey during recovery is different,so the below condition is > failing and going else block(Please check the highlighted else block). > > > public void execute(JobExecutionContext context) throws > JobExecutionException { > Exchange exchange = null; > try { > if (LOG.isDebugEnabled()) { > LOG.debug("Running CamelJob jobExecutionContext={}", > context); > } > > CamelContext camelContext = getCamelContext(context); > QuartzEndpoint endpoint = lookupQuartzEndpoint(camelContext, > context);//* Step 1 : * > exchange = endpoint.createExchange(); > exchange.setIn(new QuartzMessage(exchange, context)); > endpoint.getConsumerLoadBalancer().process(exchange);//*Step :2* > > > > * Step 1 : * Quartz endpoint creation > > > if (triggerKey.equals(checkTriggerKey){ > return quartzEndpoint; > } > > > if (camelContext.hasEndpoint(endpointUri) != null) { > if (LOG.isDebugEnabled()) { > LOG.debug("Getting Endpoint from camelContext."); > } > result = camelContext.getEndpoint(endpointUri, > QuartzEndpoint.class); > } else { > LOG.warn("Cannot find existing QuartzEndpoint with uri: {}.* > Creating new endpoint instance.", endpointUri);* > result = camelContext.getEndpoint(endpointUri, > QuartzEndpoint.class); > } > > > > Since its creating new quartz endpoint , the consumerLoadBalancer Object is > null. > > So as next step its executing the below code and creating *New * > consumerLoadBalancer Object > > *Step :2* > > > public LoadBalancer getConsumerLoadBalancer() { > if (consumerLoadBalancer == null) { > consumerLoadBalancer = new RoundRobinLoadBalancer(); > } > return consumerLoadBalancer; > } > > Since its creating as a new Object ,the processors property is empty so > not calling my QueryBuilderProcessor Processor. > > In normal flow(not recovery) i can see list of processors details in the > processors property of consumerLoadBalancer . > > *Temporary Fix for Testing:* Below Code working fine in Recovery flow with > immediate retry. > > As a temporary fix we have modified code in the camelJob.java , > lookupQuartzEndpoint method like below > (Please check the highlighted text) > > protected QuartzEndpoint lookupQuartzEndpoint(CamelContext camelContext, > JobExecutionContext quartzContext) throws JobExecutionException { > TriggerKey triggerKey = quartzContext.getTrigger().getKey(); > * JobDetail jobDetail = quartzContext.getJobDetail(); > JobKey jobKey = jobDetail.getKey(); > * > if (LOG.isDebugEnabled()) { > LOG.debug("Looking up existing QuartzEndpoint with > triggerKey={}", triggerKey); > } > > // check all active routes for the quartz endpoint this task matches > // as we prefer to use the existing endpoint from the routes > for (Route route : camelContext.getRoutes()) { > if (route.getEndpoint() instanceof QuartzEndpoint) { > QuartzEndpoint quartzEndpoint = (QuartzEndpoint) > route.getEndpoint(); > TriggerKey checkTriggerKey = quartzEndpoint.getTriggerKey(); > if (LOG.isTraceEnabled()) { > LOG.trace("Checking route endpoint={} with > checkTriggerKey={}", quartzEndpoint, checkTriggerKey); > } > if > (triggerKey.equals(checkTriggerKey)*||(jobDetail.requestsRecovery()== true > && > jobKey.getGroup().equals(checkTriggerKey.getGroup())&&jobKey.getName().equals(checkTriggerKey.getName()))) > > {* > return quartzEndpoint; > } > } > } > return quartzEndpoint; > } > > > Except the triggerKey remaning all jobdetails are same in the existing > quartzEndpoint so we put the condition check to return the existing quartz > endpoint instead creating new quartz endpoint in the recoveryflow. > > * Summary :* > The problem we found is while creating new quartz endpoint the > consumerLoadBalancer is null. > > Please let us know the right way to get the consumerLoadBalancer values > while creating new quartz endpoint in case of recovery flow. > > Thanks, > selva > > > > > > -- > View this message in context: > http://camel.465427.n5.nabble.com/Quartz-clustering-in-camel-spring-DSL-JIRA-CAMEL-8076-tp5759589p5759928.html > > Sent from the Camel - Users mailing list archive at Nabble.com. >