Hi Selva,

Thanks for sharing your patch with us. Your patch looks good to me, and I 
managed to reproduce the error by doing some change in the 
SpringQuartzConsumerTwoAppsClusteredRecoveryTest. 
I will commit the patch shortly.

Regards,

--  
Willem Jiang

Red Hat, Inc.
Web: http://www.redhat.com
Blog: http://willemjiang.blogspot.com (English)
http://jnn.iteye.com (Chinese)
Twitter: willemjiang  
Weibo: 姜宁willem



On December 2, 2014 at 7:23:03 PM, selva (sgpsel...@gmail.com) wrote:
> Hi Willem,
>  
> I have downloaded(2.14.x from github) and debugged the camel quartz
> component source code and found the reason for the issue i am facing.
>  
> As i already posted i am facing issue in the cluster environment, deployed
> the camel application as stand alone program running 2 instances. While
> executing a route if one instance goes down the other instance should
> trigger the same route immediately as we have configured
> *recoverableJob=true* in the quartz2 endpoint .
>  
> The issue i was facing like the second instance is triggering quartz2
> endpoint but not executing my process(QueryBuilderProcessor) .
>  
> Example :
>  
>  
> > uri="quartz2://cluster/quartz?cron=0+0/4+++*+?&durableJob=true&stateful=true&recoverableJob=true">
> >   
>  
>  
>  
>  
> While debugging i found that in camelJob.java, the triggerkey in the normal
> and the triggerykey during recovery is different,so the below condition is
> failing and going else block(Please check the highlighted else block).
>  
>  
> public void execute(JobExecutionContext context) throws
> JobExecutionException {
> Exchange exchange = null;
> try {
> if (LOG.isDebugEnabled()) {
> LOG.debug("Running CamelJob jobExecutionContext={}",
> context);
> }
>  
> CamelContext camelContext = getCamelContext(context);
> QuartzEndpoint endpoint = lookupQuartzEndpoint(camelContext,
> context);//* Step 1 : *
> exchange = endpoint.createExchange();
> exchange.setIn(new QuartzMessage(exchange, context));
> endpoint.getConsumerLoadBalancer().process(exchange);//*Step :2*
>  
>  
>  
> * Step 1 : * Quartz endpoint creation
>  
>  
> if (triggerKey.equals(checkTriggerKey){
> return quartzEndpoint;
> }
>  
>  
> if (camelContext.hasEndpoint(endpointUri) != null) {
> if (LOG.isDebugEnabled()) {
> LOG.debug("Getting Endpoint from camelContext.");
> }
> result = camelContext.getEndpoint(endpointUri,
> QuartzEndpoint.class);
> } else {
> LOG.warn("Cannot find existing QuartzEndpoint with uri: {}.*
> Creating new endpoint instance.", endpointUri);*
> result = camelContext.getEndpoint(endpointUri,
> QuartzEndpoint.class);
> }
>  
>  
>  
> Since its creating new quartz endpoint , the consumerLoadBalancer Object is
> null.
>  
> So as next step its executing the below code and creating *New *
> consumerLoadBalancer Object
>  
> *Step :2*
>  
>  
> public LoadBalancer getConsumerLoadBalancer() {
> if (consumerLoadBalancer == null) {
> consumerLoadBalancer = new RoundRobinLoadBalancer();
> }
> return consumerLoadBalancer;
> }
>  
> Since its creating as a new Object ,the processors property is empty so
> not calling my QueryBuilderProcessor Processor.
>  
> In normal flow(not recovery) i can see list of processors details in the
> processors property of consumerLoadBalancer .
>  
> *Temporary Fix for Testing:* Below Code working fine in Recovery flow with
> immediate retry.
>  
> As a temporary fix we have modified code in the camelJob.java ,
> lookupQuartzEndpoint method like below
> (Please check the highlighted text)
>  
> protected QuartzEndpoint lookupQuartzEndpoint(CamelContext camelContext,
> JobExecutionContext quartzContext) throws JobExecutionException {
> TriggerKey triggerKey = quartzContext.getTrigger().getKey();
> * JobDetail jobDetail = quartzContext.getJobDetail();
> JobKey jobKey = jobDetail.getKey();
> *
> if (LOG.isDebugEnabled()) {
> LOG.debug("Looking up existing QuartzEndpoint with
> triggerKey={}", triggerKey);
> }
>  
> // check all active routes for the quartz endpoint this task matches
> // as we prefer to use the existing endpoint from the routes
> for (Route route : camelContext.getRoutes()) {
> if (route.getEndpoint() instanceof QuartzEndpoint) {
> QuartzEndpoint quartzEndpoint = (QuartzEndpoint)
> route.getEndpoint();
> TriggerKey checkTriggerKey = quartzEndpoint.getTriggerKey();
> if (LOG.isTraceEnabled()) {
> LOG.trace("Checking route endpoint={} with
> checkTriggerKey={}", quartzEndpoint, checkTriggerKey);
> }
> if
> (triggerKey.equals(checkTriggerKey)*||(jobDetail.requestsRecovery()== true  
> &&
> jobKey.getGroup().equals(checkTriggerKey.getGroup())&&jobKey.getName().equals(checkTriggerKey.getName())))
>   
> {*
> return quartzEndpoint;
> }
> }
> }
> return quartzEndpoint;
> }
>  
>  
> Except the triggerKey remaning all jobdetails are same in the existing
> quartzEndpoint so we put the condition check to return the existing quartz
> endpoint instead creating new quartz endpoint in the recoveryflow.
>  
> * Summary :*
> The problem we found is while creating new quartz endpoint the
> consumerLoadBalancer is null.
>  
> Please let us know the right way to get the consumerLoadBalancer values
> while creating new quartz endpoint in case of recovery flow.
>  
> Thanks,
> selva
>  
>  
>  
>  
>  
> --
> View this message in context: 
> http://camel.465427.n5.nabble.com/Quartz-clustering-in-camel-spring-DSL-JIRA-CAMEL-8076-tp5759589p5759928.html
>   
> Sent from the Camel - Users mailing list archive at Nabble.com.
>  

Reply via email to