I have two routes that I integrated with ZooKeeperRoutePolicy to establish 
basic distributed route election.
(Using camel 2.13.1)

When used with one route instance, election/failover works fine.
When I set up multiple routes with multiple instances of ZooKeeperRoutePolicy, 
one route almost always gets immediately shut down but NOT 100% of the time.
The behavior seems nondeterministic- like a race condition.

Each RouteBuilder instance has a separate instance of a ZooKeeperRoutePolicy 
that uses the same server but a different zookeeper znode:
Like this:
Route 1: zookeeper URI: localhost:2181/scheduler-node-election/test
Route 2: zookeeper URI: localhost:2181/scheduler-node-election/googleplay

In the zookeeper CLI:
[zk: localhost:2181(CONNECTED) 6] ls /scheduler-node-election
[googleplay, test]

(I have one zookeeper node running)


I’m seeing that most of the time one route (either one) just gets shutdown 
during  Spring initialization, while the other one runs.  But once in a while, 
both routes will run.
In this case the “googleplay” route is shut down and the associated session is 
closed.

Here’s the Camel/Zookeeper logging:
[timer://test-#2] org.apache.zookeeper.ZooKeeper           : Initiating client 
connection, connectString=127.0.0.1:2181 sessionTimeout=5000 
watcher=org.apache.camel.component.zookeeper.ConnectionHolder@733ed393
[//googleplay-#1] org.apache.zookeeper.ZooKeeper           : Initiating client 
connection, connectString=127.0.0.1:2181 sessionTimeout=5000 
watcher=org.apache.camel.component.zookeeper.ConnectionHolder@21b49301
[127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn          : Opening socket 
connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate 
using SASL (unknown error)
[127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn          : Opening socket 
connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate 
using SASL (unknown error)
[127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn          : Socket connection 
established to 127.0.0.1/127.0.0.1:2181, initiating session
[127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn          : Socket connection 
established to 127.0.0.1/127.0.0.1:2181, initiating session
[127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn          : Session 
establishment complete on server 127.0.0.1/127.0.0.1:2181, sessionid = 
0x146de3b813e0032, negotiated timeout = 5000
[127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn          : Session 
establishment complete on server 127.0.0.1/127.0.0.1:2181, sessionid = 
0x146de3b813e0031, negotiated timeout = 5000
[timer://test-#2] o.a.c.c.zookeeper.ZookeeperProducer      : Node 
'/scheduler-node-election/test/localhost-3fae9f79-a91c-4baa-bdcc-bd23a2623690' 
did not exist, creating it.
[//googleplay-#1] o.a.c.c.zookeeper.ZookeeperProducer      : Node 
'/scheduler-node-election/googleplay/localhost-2b2634e5-a0b3-4c0f-8ddb-b44e008854db'
 did not exist, creating it.
[timer://test-#2] o.a.c.c.z.policy.ZooKeeperElection       : Candidate node 
'/scheduler-node-election/test/localhost-3fae9f79-a91c-4baa-bdcc-bd23a2623690' 
has been created
[//googleplay-#1] o.a.c.c.z.policy.ZooKeeperElection       : Candidate node 
'/scheduler-node-election/googleplay/localhost-2b2634e5-a0b3-4c0f-8ddb-b44e008854db'
 has been created
[//googleplay-#1] o.a.camel.spring.SpringCamelContext      : Route: 
election-route-localhos started and consuming from: 
Endpoint[zookeeper://127.0.0.1:2181/scheduler-node-election/googleplay]
[timer://test-#2] o.a.camel.impl.DefaultShutdownStrategy   : Starting to 
graceful shutdown 1 routes (timeout 300 seconds)
[ShutdownTask-#4] o.a.camel.impl.DefaultShutdownStrategy   : Route: 
election-route-localhos shutdown complete, was consuming from: 
Endpoint[zookeeper://127.0.0.1:2181/scheduler-node-election/googleplay]
[timer://test-#2] o.a.camel.impl.DefaultShutdownStrategy   : Graceful shutdown 
of 1 routes completed in 0 seconds
[timer://test-#2] o.a.camel.spring.SpringCamelContext      : Route: 
election-route-localhos is stopped, was consuming from: 
Endpoint[zookeeper://127.0.0.1:2181/scheduler-node-election/googleplay]
[-#1-EventThread] org.apache.zookeeper.ClientCnxn          : EventThread shut 
down
[timer://test-#2] org.apache.zookeeper.ZooKeeper           : Session: 
0x146de3b813e0031 closed
[timer://test-#2] o.a.camel.spring.SpringCamelContext      : Route: 
election-route-localhos is shutdown and removed, was consuming from: 
Endpoint[zookeeper://127.0.0.1:2181/scheduler-node-election/googleplay]
[timer://test-#2] o.a.camel.spring.SpringCamelContext      : Route: 
election-route-localhos started and consuming from: 
Endpoint[zookeeper://127.0.0.1:2181/scheduler-node-election/test]



My route builder code is below.  I call it twice, once for vendor = “test”, and 
once for vendor = “googleplay”.

private RouteBuilder scheduledEventBatchExecutionTimerRoute(
            final String vendor,
            final int batchSize,
            final int timerPeriodSeconds,
            final boolean autoStart) {

        log.info("scheduledEventBatchExecutionTimerRoute: vendor={}, 
doStart={}, queryInterval={}s, batchSize={}",
                vendor, autoStart, timerPeriodSeconds, batchSize);

        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {

                from("timer://" + vendor + "?period=" + timerPeriodSeconds + 
"s")
                        .autoStartup(autoStart)
                .routePolicy(electionRoutePolicy(vendor))
                .setHeader("vendor", constant(vendor)).setHeader("batchSize", 
constant(batchSize))
                .beanRef("scheduleEventRepository", 
"findExecutableEvents(entitlement.create.${in.header.vendor}, 
${in.header.batchSize})")

                .choice()
                    .when().simple("${body.size} == 0")
                        .log(LoggingLevel.DEBUG, "Route: 
vendor=${in.header.vendor}: No items to process")
                    .when().simple("${body.size} > 0")
                        .to(ROUTE_DIRECT_EVENT_BATCH_EXECUTION + "-" + vendor)

                .endChoice()
                .routeId(vendor + "-timer-execution-route");

            }
        };

    }

    private RoutePolicy electionRoutePolicy(String vendor) {

        RoutePolicy routePolicy;

        if (doUseDistributedRouteElection) {
            log.info("Creating zookeeper distributed route election policy for 
{}", vendor);
            // Ensure that only one node is processing work for the specified 
vendor.
            // See http://camel.apache.org/zookeeper.html - distributed route 
policy.
            routePolicy = new ZooKeeperRoutePolicy(zookeeperUri + 
"scheduler-node-election/" + vendor, 1);

     } else {
            // Just return a default route policy that does nothing.
            log.info("No-op route policy for {}", vendor);
            routePolicy = new RoutePolicySupport() {
                @Override
                public void onInit(Route route) {
                    super.onInit(route);
                }
            };
        }

        return routePolicy;
    }







Reply via email to