[ https://issues.apache.org/jira/browse/CURATOR-320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15826385#comment-15826385 ]
Running Fly commented on CURATOR-320: ------------------------------------- If it helps I did write some cod to treat the symptoms. I never felt like I had a good enough understanding of Curator to create a proper patch. If it helps these are the workaround changes I made to "org.apache.curator.x.discovery.details.ServiceDiscoveryImpl". It doesn't completely eliminate the problem but It does help it recover and avoid it from getting stuck in an indeterminate loop. It essentially aborts the reregister if the connection drops and start over. Every time it fails it waits a little longer before retrying. This helps to soften the load on the ZK server during a sudden connection recovery. Its far from ideal but it works and has gotten us by for months. {code} /** * A mechanism to register and query service instances using ZooKeeper @@ -66,17 +70,16 @@ private final Collection<ServiceProvider<T>> providers = Sets .newSetFromMap(Maps.<ServiceProvider<T>, Boolean> newConcurrentMap()); private final boolean watchInstances; + + private ExecutorService reRegisterExecutor = ThreadUtils.newSingleThreadExecutor("reRegister"); + private volatile Future< ? > reRegisterFuture = null; + private int reRegisterRetryCount = 0; private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { if ((newState == ConnectionState.RECONNECTED) || (newState == ConnectionState.CONNECTED)) { - try { - log.debug("Re-registering due to reconnection"); - reRegisterServices(); - } catch (Exception e) { - ThreadUtils.checkInterrupted(e); - log.error("Could not re-register instances after reconnection", e); - } + log.warn("Reconnection event. Calling re-register."); + reRegisterServices(true); } } }; @@ -118,11 +121,7 @@ */ @Override public void start() throws Exception { - try { - reRegisterServices(); - } catch (KeeperException e) { - log.error("Could not register instances - will try again later", e); - } + reRegisterServices(false); client.getConnectionStateListenable().addListener(connectionStateListener); } @@ -368,10 +367,57 @@ return (entry != null) ? entry.service : null; } - private void reRegisterServices() throws Exception { - for (final Entry<T> entry : services.values()) { - synchronized (entry) { - internalRegisterService(entry.service); + private void reRegisterServices(final boolean concurrent) { + synchronized (reRegisterExecutor) { + if (reRegisterFuture != null) { + reRegisterFuture.cancel(true); + log.warn("Re-register restarting."); + reRegisterRetryCount++; + } + reRegisterFuture = reRegisterExecutor.submit(new Runnable() { + @Override + public void run() { + int count = 0; + try { + if (reRegisterRetryCount > 0) { + int secToWait = reRegisterRetryCount * 5; + secToWait = secToWait < 180 ? secToWait : 180; + log.info("Re-register attempt {} will start in {}s", reRegisterRetryCount, secToWait); + Thread.sleep(secToWait * 1000); + } else { + log.info("Re-register will start immediately"); + } + for (final Entry<T> entry : services.values()) { + synchronized (entry) { + if (Thread.interrupted()) { + throw new InterruptedException(); + } + internalRegisterService(entry.service); + } + count++; + } + log.warn( + "Re-registered {} services." + + (reRegisterRetryCount > 0 ? " After {} retries." : ""), + count, reRegisterRetryCount); + synchronized (reRegisterExecutor) { + reRegisterRetryCount = 0; + reRegisterFuture = null; + } + } catch (InterruptedException ie) { + log.warn("Re-register interrupted. After registering {} services.", count); + } catch (Exception e) { + ThreadUtils.checkInterrupted(e); + log.error("Could not re-register instances after reconnection", e); + } + } + }); + } + if (concurrent == false) { + try { + reRegisterFuture.get(); + } catch (Exception e) { + log.error("Re-register execution exception:", e); } } } {code} > Discovery reregister triggered even if retry policy suceeds. Connection > looping condition. > ------------------------------------------------------------------------------------------ > > Key: CURATOR-320 > URL: https://issues.apache.org/jira/browse/CURATOR-320 > Project: Apache Curator > Issue Type: Bug > Components: Client, Framework > Affects Versions: TBD, 2.10.0 > Environment: 3 server Quorum running on individual AWS boxes. > Session timeout set to 1-2 min on most clients. > Reporter: Running Fly > Fix For: TBD > > > ServiceDiscoveryImpl.reRegisterServices() can be trigger on > ConnectionState events: RECONNECTED and CONNECTED. Causing the > reRegisterServices() method to be run on ConnectionStateManager thread. If a > connection drops while running reRegisterServices() it will be recovered by > the retry policy. However the ConnectionState SUSPENDED followed by > RECONNECTED events will be queued but not fired until reRegisterServices() > completes(ConnectionStateManager Thread fires these events but is in use). > When it does complete the RECONNECTED event in the queue will fire and > reRegisterServices() will rerun. > When zookeeper's server connection is interrupted all of the clients will > simultaneously call reRegisterServices(). This overloads the server with > requests causing connections to timeout and reset. Thus queuing up more > RECONNECTED events. This state can persist indefinitely. > Because the reRegisterServices() will most likely receive a > NodeExistsException. It deletes and recreates the node. Effectively causing > the services to thrash up and down. Wreaking havoc on our service dependency > chain. -- This message was sent by Atlassian JIRA (v6.3.4#6332)