I was doing something similar, and what I noticed in JConsole was that
there was only ever one zookeeper election route created even when I had
multiple route policies. There are two similar Jira issues related to
this: CAMEL-7501 and CAMEL-5627. They might be the source of your trouble.

Hope that helps. If it really is a bug in the zk routing policy then that
might save you some hair-pulling in your own code.

-Scott


On Fri, Jun 27, 2014 at 5:41 PM, Michael Moores <mmoo...@realnetworks.com>
wrote:

> I tried an experiment that seems to point to a race condition in the Camel
> ZooKeeper component.
>
> I initialized the routes with autoStart=false, then started each route
> manually.
> When I do this everything works fine - no immediate shutdown of one of the
> routes.. I think it might have something to do with
> ZooKeeperElection.awaitElectionResults(), but not sure.
>
>
>
> On 6/27/14, 2:00 PM, "Michael Moores" <mmoo...@realnetworks.com> wrote:
>
> >I have two routes that I integrated with ZooKeeperRoutePolicy to
> >establish basic distributed route election.
> >(Using camel 2.13.1)
> >
> >When used with one route instance, election/failover works fine.
> >When I set up multiple routes with multiple instances of
> >ZooKeeperRoutePolicy, one route almost always gets immediately shut down
> >but NOT 100% of the time.
> >The behavior seems nondeterministic- like a race condition.
> >
> >Each RouteBuilder instance has a separate instance of a
> >ZooKeeperRoutePolicy that uses the same server but a different zookeeper
> >znode:
> >Like this:
> >Route 1: zookeeper URI: localhost:2181/scheduler-node-election/test
> >Route 2: zookeeper URI: localhost:2181/scheduler-node-election/googleplay
> >
> >In the zookeeper CLI:
> >[zk: localhost:2181(CONNECTED) 6] ls /scheduler-node-election
> >[googleplay, test]
> >
> >(I have one zookeeper node running)
> >
> >
> >I¹m seeing that most of the time one route (either one) just gets
> >shutdown during  Spring initialization, while the other one runs.  But
> >once in a while, both routes will run.
> >In this case the ³googleplay² route is shut down and the associated
> >session is closed.
> >
> >Here¹s the Camel/Zookeeper logging:
> >[timer://test-#2] org.apache.zookeeper.ZooKeeper           : Initiating
> >client connection, connectString=127.0.0.1:2181 sessionTimeout=5000
> >watcher=org.apache.camel.component.zookeeper.ConnectionHolder@733ed393
> >[//googleplay-#1] org.apache.zookeeper.ZooKeeper           : Initiating
> >client connection, connectString=127.0.0.1:2181 sessionTimeout=5000
> >watcher=org.apache.camel.component.zookeeper.ConnectionHolder@21b49301
> >[127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn          : Opening
> >socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to
> >authenticate using SASL (unknown error)
> >[127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn          : Opening
> >socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to
> >authenticate using SASL (unknown error)
> >[127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn          : Socket
> >connection established to 127.0.0.1/127.0.0.1:2181, initiating session
> >[127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn          : Socket
> >connection established to 127.0.0.1/127.0.0.1:2181, initiating session
> >[127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn          : Session
> >establishment complete on server 127.0.0.1/127.0.0.1:2181, sessionid =
> >0x146de3b813e0032, negotiated timeout = 5000
> >[127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn          : Session
> >establishment complete on server 127.0.0.1/127.0.0.1:2181, sessionid =
> >0x146de3b813e0031, negotiated timeout = 5000
> >[timer://test-#2] o.a.c.c.zookeeper.ZookeeperProducer      : Node
> >'/scheduler-node-election/test/localhost-3fae9f79-a91c-4baa-bdcc-bd23a2623
> >690' did not exist, creating it.
> >[//googleplay-#1] o.a.c.c.zookeeper.ZookeeperProducer      : Node
> >'/scheduler-node-election/googleplay/localhost-2b2634e5-a0b3-4c0f-8ddb-b44
> >e008854db' did not exist, creating it.
> >[timer://test-#2] o.a.c.c.z.policy.ZooKeeperElection       : Candidate
> >node
> >'/scheduler-node-election/test/localhost-3fae9f79-a91c-4baa-bdcc-bd23a2623
> >690' has been created
> >[//googleplay-#1] o.a.c.c.z.policy.ZooKeeperElection       : Candidate
> >node
> >'/scheduler-node-election/googleplay/localhost-2b2634e5-a0b3-4c0f-8ddb-b44
> >e008854db' has been created
> >[//googleplay-#1] o.a.camel.spring.SpringCamelContext      : Route:
> >election-route-localhos started and consuming from:
> >Endpoint[zookeeper://127.0.0.1:2181/scheduler-node-election/googleplay]
> >[timer://test-#2] o.a.camel.impl.DefaultShutdownStrategy   : Starting to
> >graceful shutdown 1 routes (timeout 300 seconds)
> >[ShutdownTask-#4] o.a.camel.impl.DefaultShutdownStrategy   : Route:
> >election-route-localhos shutdown complete, was consuming from:
> >Endpoint[zookeeper://127.0.0.1:2181/scheduler-node-election/googleplay]
> >[timer://test-#2] o.a.camel.impl.DefaultShutdownStrategy   : Graceful
> >shutdown of 1 routes completed in 0 seconds
> >[timer://test-#2] o.a.camel.spring.SpringCamelContext      : Route:
> >election-route-localhos is stopped, was consuming from:
> >Endpoint[zookeeper://127.0.0.1:2181/scheduler-node-election/googleplay]
> >[-#1-EventThread] org.apache.zookeeper.ClientCnxn          : EventThread
> >shut down
> >[timer://test-#2] org.apache.zookeeper.ZooKeeper           : Session:
> >0x146de3b813e0031 closed
> >[timer://test-#2] o.a.camel.spring.SpringCamelContext      : Route:
> >election-route-localhos is shutdown and removed, was consuming from:
> >Endpoint[zookeeper://127.0.0.1:2181/scheduler-node-election/googleplay]
> >[timer://test-#2] o.a.camel.spring.SpringCamelContext      : Route:
> >election-route-localhos started and consuming from:
> >Endpoint[zookeeper://127.0.0.1:2181/scheduler-node-election/test]
> >
> >
> >
> >My route builder code is below.  I call it twice, once for vendor =
> >³test², and once for vendor = ³googleplay².
> >
> >private RouteBuilder scheduledEventBatchExecutionTimerRoute(
> >            final String vendor,
> >            final int batchSize,
> >            final int timerPeriodSeconds,
> >            final boolean autoStart) {
> >
> >        log.info("scheduledEventBatchExecutionTimerRoute: vendor={},
> >doStart={}, queryInterval={}s, batchSize={}",
> >                vendor, autoStart, timerPeriodSeconds, batchSize);
> >
> >        return new RouteBuilder() {
> >            @Override
> >            public void configure() throws Exception {
> >
> >                from("timer://" + vendor + "?period=" +
> >timerPeriodSeconds + "s")
> >                        .autoStartup(autoStart)
> >                .routePolicy(electionRoutePolicy(vendor))
> >                .setHeader("vendor",
> >constant(vendor)).setHeader("batchSize", constant(batchSize))
> >                .beanRef("scheduleEventRepository",
> >"findExecutableEvents(entitlement.create.${in.header.vendor},
> >${in.header.batchSize})")
> >
> >                .choice()
> >                    .when().simple("${body.size} == 0")
> >                        .log(LoggingLevel.DEBUG, "Route:
> >vendor=${in.header.vendor}: No items to process")
> >                    .when().simple("${body.size} > 0")
> >                        .to(ROUTE_DIRECT_EVENT_BATCH_EXECUTION + "-" +
> >vendor)
> >
> >                .endChoice()
> >                .routeId(vendor + "-timer-execution-route");
> >
> >            }
> >        };
> >
> >    }
> >
> >    private RoutePolicy electionRoutePolicy(String vendor) {
> >
> >        RoutePolicy routePolicy;
> >
> >        if (doUseDistributedRouteElection) {
> >            log.info("Creating zookeeper distributed route election
> >policy for {}", vendor);
> >            // Ensure that only one node is processing work for the
> >specified vendor.
> >            // See http://camel.apache.org/zookeeper.html - distributed
> >route policy.
> >            routePolicy = new ZooKeeperRoutePolicy(zookeeperUri +
> >"scheduler-node-election/" + vendor, 1);
> >
> >     } else {
> >            // Just return a default route policy that does nothing.
> >            log.info("No-op route policy for {}", vendor);
> >            routePolicy = new RoutePolicySupport() {
> >                @Override
> >                public void onInit(Route route) {
> >                    super.onInit(route);
> >                }
> >            };
> >        }
> >
> >        return routePolicy;
> >    }
> >
> >
> >
> >
> >
> >
> >
>
>


-- 
Scott Stults | Founder & Solutions Architect | OpenSource Connections, LLC
| 434.409.2780
http://www.opensourceconnections.com

Reply via email to