I have two routes that I integrated with ZooKeeperRoutePolicy to establish basic distributed route election. (Using camel 2.13.1)
When used with one route instance, election/failover works fine. When I set up multiple routes with multiple instances of ZooKeeperRoutePolicy, one route almost always gets immediately shut down but NOT 100% of the time. The behavior seems nondeterministic- like a race condition. Each RouteBuilder instance has a separate instance of a ZooKeeperRoutePolicy that uses the same server but a different zookeeper znode: Like this: Route 1: zookeeper URI: localhost:2181/scheduler-node-election/test Route 2: zookeeper URI: localhost:2181/scheduler-node-election/googleplay In the zookeeper CLI: [zk: localhost:2181(CONNECTED) 6] ls /scheduler-node-election [googleplay, test] (I have one zookeeper node running) I’m seeing that most of the time one route (either one) just gets shutdown during Spring initialization, while the other one runs. But once in a while, both routes will run. In this case the “googleplay” route is shut down and the associated session is closed. Here’s the Camel/Zookeeper logging: [timer://test-#2] org.apache.zookeeper.ZooKeeper : Initiating client connection, connectString=127.0.0.1:2181 sessionTimeout=5000 watcher=org.apache.camel.component.zookeeper.ConnectionHolder@733ed393 [//googleplay-#1] org.apache.zookeeper.ZooKeeper : Initiating client connection, connectString=127.0.0.1:2181 sessionTimeout=5000 watcher=org.apache.camel.component.zookeeper.ConnectionHolder@21b49301 [127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn : Opening socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) [127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn : Opening socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) [127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn : Socket connection established to 127.0.0.1/127.0.0.1:2181, initiating session [127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn : Socket connection established to 127.0.0.1/127.0.0.1:2181, initiating session [127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn : Session establishment complete on server 127.0.0.1/127.0.0.1:2181, sessionid = 0x146de3b813e0032, negotiated timeout = 5000 [127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn : Session establishment complete on server 127.0.0.1/127.0.0.1:2181, sessionid = 0x146de3b813e0031, negotiated timeout = 5000 [timer://test-#2] o.a.c.c.zookeeper.ZookeeperProducer : Node '/scheduler-node-election/test/localhost-3fae9f79-a91c-4baa-bdcc-bd23a2623690' did not exist, creating it. [//googleplay-#1] o.a.c.c.zookeeper.ZookeeperProducer : Node '/scheduler-node-election/googleplay/localhost-2b2634e5-a0b3-4c0f-8ddb-b44e008854db' did not exist, creating it. [timer://test-#2] o.a.c.c.z.policy.ZooKeeperElection : Candidate node '/scheduler-node-election/test/localhost-3fae9f79-a91c-4baa-bdcc-bd23a2623690' has been created [//googleplay-#1] o.a.c.c.z.policy.ZooKeeperElection : Candidate node '/scheduler-node-election/googleplay/localhost-2b2634e5-a0b3-4c0f-8ddb-b44e008854db' has been created [//googleplay-#1] o.a.camel.spring.SpringCamelContext : Route: election-route-localhos started and consuming from: Endpoint[zookeeper://127.0.0.1:2181/scheduler-node-election/googleplay] [timer://test-#2] o.a.camel.impl.DefaultShutdownStrategy : Starting to graceful shutdown 1 routes (timeout 300 seconds) [ShutdownTask-#4] o.a.camel.impl.DefaultShutdownStrategy : Route: election-route-localhos shutdown complete, was consuming from: Endpoint[zookeeper://127.0.0.1:2181/scheduler-node-election/googleplay] [timer://test-#2] o.a.camel.impl.DefaultShutdownStrategy : Graceful shutdown of 1 routes completed in 0 seconds [timer://test-#2] o.a.camel.spring.SpringCamelContext : Route: election-route-localhos is stopped, was consuming from: Endpoint[zookeeper://127.0.0.1:2181/scheduler-node-election/googleplay] [-#1-EventThread] org.apache.zookeeper.ClientCnxn : EventThread shut down [timer://test-#2] org.apache.zookeeper.ZooKeeper : Session: 0x146de3b813e0031 closed [timer://test-#2] o.a.camel.spring.SpringCamelContext : Route: election-route-localhos is shutdown and removed, was consuming from: Endpoint[zookeeper://127.0.0.1:2181/scheduler-node-election/googleplay] [timer://test-#2] o.a.camel.spring.SpringCamelContext : Route: election-route-localhos started and consuming from: Endpoint[zookeeper://127.0.0.1:2181/scheduler-node-election/test] My route builder code is below. I call it twice, once for vendor = “test”, and once for vendor = “googleplay”. private RouteBuilder scheduledEventBatchExecutionTimerRoute( final String vendor, final int batchSize, final int timerPeriodSeconds, final boolean autoStart) { log.info("scheduledEventBatchExecutionTimerRoute: vendor={}, doStart={}, queryInterval={}s, batchSize={}", vendor, autoStart, timerPeriodSeconds, batchSize); return new RouteBuilder() { @Override public void configure() throws Exception { from("timer://" + vendor + "?period=" + timerPeriodSeconds + "s") .autoStartup(autoStart) .routePolicy(electionRoutePolicy(vendor)) .setHeader("vendor", constant(vendor)).setHeader("batchSize", constant(batchSize)) .beanRef("scheduleEventRepository", "findExecutableEvents(entitlement.create.${in.header.vendor}, ${in.header.batchSize})") .choice() .when().simple("${body.size} == 0") .log(LoggingLevel.DEBUG, "Route: vendor=${in.header.vendor}: No items to process") .when().simple("${body.size} > 0") .to(ROUTE_DIRECT_EVENT_BATCH_EXECUTION + "-" + vendor) .endChoice() .routeId(vendor + "-timer-execution-route"); } }; } private RoutePolicy electionRoutePolicy(String vendor) { RoutePolicy routePolicy; if (doUseDistributedRouteElection) { log.info("Creating zookeeper distributed route election policy for {}", vendor); // Ensure that only one node is processing work for the specified vendor. // See http://camel.apache.org/zookeeper.html - distributed route policy. routePolicy = new ZooKeeperRoutePolicy(zookeeperUri + "scheduler-node-election/" + vendor, 1); } else { // Just return a default route policy that does nothing. log.info("No-op route policy for {}", vendor); routePolicy = new RoutePolicySupport() { @Override public void onInit(Route route) { super.onInit(route); } }; } return routePolicy; }