Continued work on new LOST behavior. Added some tests. To get correct behavior it's necessary to not retry connection failures. Retrying connection failures was never a good idea and here's a good opportunity to fix it as this requires client action to enable
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/62f3c33c Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/62f3c33c Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/62f3c33c Branch: refs/heads/CURATOR-3.0 Commit: 62f3c33cdb556eccf6fe1cc87ee74b3458431777 Parents: 2343daf Author: randgalt <randg...@apache.org> Authored: Fri Aug 21 17:35:44 2015 -0500 Committer: randgalt <randg...@apache.org> Committed: Fri Aug 21 17:35:44 2015 -0500 ---------------------------------------------------------------------- .../org/apache/curator/ConnectionState.java | 24 ++--- .../apache/curator/CuratorZookeeperClient.java | 58 +++++++----- .../main/java/org/apache/curator/RetryLoop.java | 12 +++ .../framework/CuratorFrameworkFactory.java | 2 +- .../framework/imps/CuratorFrameworkImpl.java | 43 ++++++++- .../framework/state/ConnectionState.java | 5 + .../framework/state/ConnectionStateManager.java | 13 ++- .../framework/imps/TestBlockUntilConnected.java | 1 + .../imps/TestEnabledSessionExpiredState.java | 99 ++++++++++++++++++++ .../apache/curator/test/BaseClassForTests.java | 37 +++++++- .../java/org/apache/curator/test/Timing.java | 2 +- 11 files changed, 253 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-client/src/main/java/org/apache/curator/ConnectionState.java ---------------------------------------------------------------------- diff --git a/curator-client/src/main/java/org/apache/curator/ConnectionState.java b/curator-client/src/main/java/org/apache/curator/ConnectionState.java index d3900a1..1dfdbef 100644 --- a/curator-client/src/main/java/org/apache/curator/ConnectionState.java +++ b/curator-client/src/main/java/org/apache/curator/ConnectionState.java @@ -171,6 +171,18 @@ class ConnectionState implements Watcher, Closeable return ensembleProvider; } + synchronized void reset() throws Exception + { + log.debug("reset"); + + instanceIndex.incrementAndGet(); + + isConnected.set(false); + connectionStartMs = System.currentTimeMillis(); + zooKeeper.closeAndReset(); + zooKeeper.getZooKeeper(); // initiate connection + } + private synchronized void checkTimeouts() throws Exception { int minTimeout = Math.min(sessionTimeoutMs, connectionTimeoutMs); @@ -206,18 +218,6 @@ class ConnectionState implements Watcher, Closeable } } - private synchronized void reset() throws Exception - { - log.debug("reset"); - - instanceIndex.incrementAndGet(); - - isConnected.set(false); - connectionStartMs = System.currentTimeMillis(); - zooKeeper.closeAndReset(); - zooKeeper.getZooKeeper(); // initiate connection - } - private boolean checkState(Event.KeeperState state, boolean wasConnected) { boolean isConnected = wasConnected; http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java ---------------------------------------------------------------------- diff --git a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java index 09b28b2..fbb2f4c 100644 --- a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java +++ b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.curator; import com.google.common.base.Preconditions; @@ -43,12 +44,12 @@ import java.util.concurrent.atomic.AtomicReference; @SuppressWarnings("UnusedDeclaration") public class CuratorZookeeperClient implements Closeable { - private final Logger log = LoggerFactory.getLogger(getClass()); - private final ConnectionState state; - private final AtomicReference<RetryPolicy> retryPolicy = new AtomicReference<RetryPolicy>(); - private final int connectionTimeoutMs; - private final AtomicBoolean started = new AtomicBoolean(false); - private final AtomicReference<TracerDriver> tracer = new AtomicReference<TracerDriver>(new DefaultTracerDriver()); + private final Logger log = LoggerFactory.getLogger(getClass()); + private final ConnectionState state; + private final AtomicReference<RetryPolicy> retryPolicy = new AtomicReference<RetryPolicy>(); + private final int connectionTimeoutMs; + private final AtomicBoolean started = new AtomicBoolean(false); + private final AtomicReference<TracerDriver> tracer = new AtomicReference<TracerDriver>(new DefaultTracerDriver()); /** * @@ -159,7 +160,7 @@ public class CuratorZookeeperClient implements Closeable Preconditions.checkState(started.get(), "Client is not started"); log.debug("blockUntilConnectedOrTimedOut() start"); - TimeTrace trace = startTracer("blockUntilConnectedOrTimedOut"); + TimeTrace trace = startTracer("blockUntilConnectedOrTimedOut"); internalBlockUntilConnectedOrTimedOut(); @@ -176,7 +177,7 @@ public class CuratorZookeeperClient implements Closeable * * @throws IOException errors */ - public void start() throws Exception + public void start() throws Exception { log.debug("Starting"); @@ -192,7 +193,7 @@ public class CuratorZookeeperClient implements Closeable /** * Close the client */ - public void close() + public void close() { log.debug("Closing"); @@ -212,7 +213,7 @@ public class CuratorZookeeperClient implements Closeable * * @param policy new policy */ - public void setRetryPolicy(RetryPolicy policy) + public void setRetryPolicy(RetryPolicy policy) { Preconditions.checkNotNull(policy, "policy cannot be null"); @@ -234,7 +235,7 @@ public class CuratorZookeeperClient implements Closeable * @param name name of the event * @return the new tracer ({@link TimeTrace#commit()} must be called) */ - public TimeTrace startTracer(String name) + public TimeTrace startTracer(String name) { return new TimeTrace(name, tracer.get()); } @@ -244,7 +245,7 @@ public class CuratorZookeeperClient implements Closeable * * @return tracing driver */ - public TracerDriver getTracerDriver() + public TracerDriver getTracerDriver() { return tracer.get(); } @@ -254,7 +255,7 @@ public class CuratorZookeeperClient implements Closeable * * @param tracer new tracing driver */ - public void setTracerDriver(TracerDriver tracer) + public void setTracerDriver(TracerDriver tracer) { this.tracer.set(tracer); } @@ -265,7 +266,7 @@ public class CuratorZookeeperClient implements Closeable * * @return connection string */ - public String getCurrentConnectionString() + public String getCurrentConnectionString() { return state.getEnsembleProvider().getConnectionString(); } @@ -281,6 +282,16 @@ public class CuratorZookeeperClient implements Closeable } /** + * For internal use only - reset the internally managed ZK handle + * + * @throws Exception errors + */ + public void reset() throws Exception + { + state.reset(); + } + + /** * Every time a new {@link ZooKeeper} instance is allocated, the "instance index" * is incremented. * @@ -291,22 +302,27 @@ public class CuratorZookeeperClient implements Closeable return state.getInstanceIndex(); } - void addParentWatcher(Watcher watcher) + public boolean retryConnectionTimeouts() + { + return true; + } + + void addParentWatcher(Watcher watcher) { state.addParentWatcher(watcher); } - void removeParentWatcher(Watcher watcher) + void removeParentWatcher(Watcher watcher) { state.removeParentWatcher(watcher); } void internalBlockUntilConnectedOrTimedOut() throws InterruptedException { - long waitTimeMs = connectionTimeoutMs; + long waitTimeMs = connectionTimeoutMs; while ( !state.isConnected() && (waitTimeMs > 0) ) { - final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch latch = new CountDownLatch(1); Watcher tempWatcher = new Watcher() { @Override @@ -315,9 +331,9 @@ public class CuratorZookeeperClient implements Closeable latch.countDown(); } }; - + state.addParentWatcher(tempWatcher); - long startTimeMs = System.currentTimeMillis(); + long startTimeMs = System.currentTimeMillis(); try { latch.await(1, TimeUnit.SECONDS); @@ -326,7 +342,7 @@ public class CuratorZookeeperClient implements Closeable { state.removeParentWatcher(tempWatcher); } - long elapsed = Math.max(1, System.currentTimeMillis() - startTimeMs); + long elapsed = Math.max(1, System.currentTimeMillis() - startTimeMs); waitTimeMs -= elapsed; } } http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-client/src/main/java/org/apache/curator/RetryLoop.java ---------------------------------------------------------------------- diff --git a/curator-client/src/main/java/org/apache/curator/RetryLoop.java b/curator-client/src/main/java/org/apache/curator/RetryLoop.java index 065ebef..8d77cf7 100644 --- a/curator-client/src/main/java/org/apache/curator/RetryLoop.java +++ b/curator-client/src/main/java/org/apache/curator/RetryLoop.java @@ -98,11 +98,17 @@ public class RetryLoop { T result = null; RetryLoop retryLoop = client.newRetryLoop(); + boolean connectionFailed = false; while ( retryLoop.shouldContinue() ) { try { client.internalBlockUntilConnectedOrTimedOut(); + if ( !client.isConnected() && !client.retryConnectionTimeouts() ) + { + connectionFailed = true; + break; + } result = proc.call(); retryLoop.markComplete(); @@ -112,6 +118,12 @@ public class RetryLoop retryLoop.takeException(e); } } + + if ( connectionFailed ) + { + throw new KeeperException.ConnectionLossException(); + } + return result; } http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java index 6209b06..fad4fc2 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java @@ -117,7 +117,7 @@ public class CuratorFrameworkFactory private ACLProvider aclProvider = DEFAULT_ACL_PROVIDER; private boolean canBeReadOnly = false; private boolean useContainerParentsIfAvailable = true; - private boolean enableSessionExpiredState = false; + private boolean enableSessionExpiredState = Boolean.getBoolean("curator-enable-session-expired-state"); /** * Apply the current values and build a new CuratorFramework http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index c64fb8f..c359fdc 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -61,6 +61,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; public class CuratorFrameworkImpl implements CuratorFramework @@ -84,6 +85,7 @@ public class CuratorFrameworkImpl implements CuratorFramework private final NamespaceWatcherMap namespaceWatcherMap = new NamespaceWatcherMap(this); private final boolean useContainerParentsIfAvailable; private final boolean enableSessionExpiredState; + private final AtomicLong currentInstanceIndex = new AtomicLong(-1); private volatile ExecutorService executorService; private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false); @@ -104,15 +106,31 @@ public class CuratorFrameworkImpl implements CuratorFramework public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder) { ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory()); - this.client = new CuratorZookeeperClient(localZookeeperFactory, builder.getEnsembleProvider(), builder.getSessionTimeoutMs(), builder.getConnectionTimeoutMs(), new Watcher() + this.client = new CuratorZookeeperClient + ( + localZookeeperFactory, + builder.getEnsembleProvider(), + builder.getSessionTimeoutMs(), + builder.getConnectionTimeoutMs(), + new Watcher() + { + @Override + public void process(WatchedEvent watchedEvent) + { + CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null); + processEvent(event); + } + }, + builder.getRetryPolicy(), + builder.canBeReadOnly() + ) { @Override - public void process(WatchedEvent watchedEvent) + public boolean retryConnectionTimeouts() { - CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null); - processEvent(event); + return !enableSessionExpiredState; } - }, builder.getRetryPolicy(), builder.canBeReadOnly()); + }; listeners = new ListenerContainer<CuratorListener>(); unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>(); @@ -675,14 +693,29 @@ public class CuratorFrameworkImpl implements CuratorFramework } else if ( state == Watcher.Event.KeeperState.SyncConnected ) { + checkNewConnection(); connectionStateManager.addStateChange(ConnectionState.RECONNECTED); } else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly ) { + checkNewConnection(); connectionStateManager.addStateChange(ConnectionState.READ_ONLY); } } + private void checkNewConnection() + { + if ( enableSessionExpiredState ) + { + long instanceIndex = client.getInstanceIndex(); + long newInstanceIndex = currentInstanceIndex.getAndSet(instanceIndex); + if ( (newInstanceIndex >= 0) && (instanceIndex != newInstanceIndex) ) // currentInstanceIndex is initially -1 - ignore this + { + connectionStateManager.addStateChange(ConnectionState.LOST); + } + } + } + Watcher.Event.KeeperState codeToState(KeeperException.Code code) { switch ( code ) http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java index 49d0044..79f3b62 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java @@ -86,6 +86,11 @@ public enum ConnectionState * b) Curator closes the internally managed ZooKeeper instance; c) The configured session timeout * elapses during a network partition. * </p> + * + * <p> + * NOTE: the new behavior for the LOST state can also be enabled via the command line + * property "curator-enable-session-expired-state" (e.g. -Dcurator-enable-session-expired-state=true) + * </p> */ LOST { http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java index c0feb84..553faac 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java @@ -251,7 +251,8 @@ public class ConnectionStateManager implements Closeable { while ( !Thread.currentThread().isInterrupted() ) { - final ConnectionState newState = eventQueue.poll(sessionTimeoutMs, TimeUnit.MILLISECONDS); + int pollMaxMs = (sessionTimeoutMs * 2) / 3; // 2/3 of session timeout + final ConnectionState newState = eventQueue.poll(pollMaxMs, TimeUnit.MILLISECONDS); if ( newState != null ) { if ( listeners.size() == 0 ) @@ -294,7 +295,15 @@ public class ConnectionStateManager implements Closeable long elapsedMs = System.currentTimeMillis() - startOfSuspendedEpoch; if ( elapsedMs >= sessionTimeoutMs ) { - log.info(String.format("Session timeout has elapsed while SUSPENDED. Posting LOST event. Elapsed ms: %d", elapsedMs)); + log.info(String.format("Session timeout has elapsed while SUSPENDED. Posting LOST event and resetting the connection. Elapsed ms: %d", elapsedMs)); + try + { + client.getZookeeperClient().reset(); + } + catch ( Exception e ) + { + log.error("Could not reset the connection", e); + } addStateChange(ConnectionState.LOST); } } http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java index f649afb..eeec797 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java @@ -114,6 +114,7 @@ public class TestBlockUntilConnected extends BaseClassForTests Timing timing = new Timing(); CuratorFramework client = CuratorFrameworkFactory.builder(). connectString(server.getConnectString()). + sessionTimeoutMs(timing.session()). retryPolicy(new RetryOneTime(1)). build(); http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java new file mode 100644 index 0000000..030a292 --- /dev/null +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java @@ -0,0 +1,99 @@ +package org.apache.curator.framework.imps; + +import com.google.common.collect.Queues; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.KillSession; +import org.apache.curator.test.Timing; +import org.apache.curator.utils.CloseableUtils; +import org.apache.zookeeper.KeeperException; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +public class TestEnabledSessionExpiredState extends BaseClassForTests +{ + private final Timing timing = new Timing(); + + private CuratorFramework client; + private BlockingQueue<ConnectionState> states; + + @BeforeMethod + @Override + public void setup() throws Exception + { + super.setup(); + + client = CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .connectionTimeoutMs(timing.connection()) + .sessionTimeoutMs(timing.session()) + .enableSessionExpiredState() + .retryPolicy(new RetryOneTime(1)) + .build(); + client.start(); + + states = Queues.newLinkedBlockingQueue(); + ConnectionStateListener listener = new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + states.add(newState); + } + }; + client.getConnectionStateListenable().addListener(listener); + } + + @AfterMethod + @Override + public void teardown() throws Exception + { + CloseableUtils.closeQuietly(client); + + super.teardown(); + } + + @Test + public void testKillSession() throws Exception + { + Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED); + + KillSession.kill(client.getZookeeperClient().getZooKeeper(), server.getConnectString()); + + Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED); + Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.LOST); + Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED); + } + + @Test + public void testReconnectWithoutExpiration() throws Exception + { + Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED); + server.stop(); + try + { + client.checkExists().forPath("/"); // any API call that will invoke the retry policy, etc. + } + catch ( KeeperException.ConnectionLossException ignore ) + { + } + Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED); + server.restart(); + client.checkExists().forPath("/"); + Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED); + } + + @Override + protected boolean enabledSessionExpiredStateAware() + { + return true; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java ---------------------------------------------------------------------- diff --git a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java index 6ef3bb0..c9f3524 100644 --- a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java +++ b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java @@ -16,10 +16,16 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.curator.test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.IInvokedMethod; +import org.testng.IInvokedMethodListener; import org.testng.IRetryAnalyzer; import org.testng.ITestContext; +import org.testng.ITestNGListener; import org.testng.ITestNGMethod; import org.testng.ITestResult; import org.testng.annotations.AfterMethod; @@ -32,11 +38,13 @@ import java.util.concurrent.atomic.AtomicBoolean; public class BaseClassForTests { protected TestingServer server; + private final Logger log = LoggerFactory.getLogger(getClass()); - private static final int RETRY_WAIT_MS = 5000; + private static final int RETRY_WAIT_MS = 5000; private static final String INTERNAL_PROPERTY_DONT_LOG_CONNECTION_ISSUES; private static final String INTERNAL_PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND; private static final String INTERNAL_RETRY_FAILED_TESTS; + static { String logConnectionIssues = null; @@ -70,8 +78,30 @@ public class BaseClassForTests @BeforeSuite(alwaysRun = true) public void beforeSuite(ITestContext context) { + if ( !enabledSessionExpiredStateAware() ) + { + ITestNGListener listener = new IInvokedMethodListener() + { + @Override + public void beforeInvocation(IInvokedMethod method, ITestResult testResult) + { + int invocationCount = method.getTestMethod().getCurrentInvocationCount(); + System.setProperty("curator-enable-session-expired-state", Boolean.toString(invocationCount == 1)); + log.info("curator-enable-session-expired-state: " + Boolean.toString(invocationCount == 1)); + } + + @Override + public void afterInvocation(IInvokedMethod method, ITestResult testResult) + { + System.clearProperty("curator-enable-session-expired-state"); + } + }; + context.getSuite().addListener(listener); + } + for ( ITestNGMethod method : context.getAllTestMethods() ) { + method.setInvocationCount(enabledSessionExpiredStateAware() ? 1 : 2); method.setRetryAnalyzer(new RetryTest()); } } @@ -117,6 +147,11 @@ public class BaseClassForTests } } + protected boolean enabledSessionExpiredStateAware() + { + return false; + } + private static class RetryTest implements IRetryAnalyzer { private final AtomicBoolean hasBeenRetried = new AtomicBoolean(!Boolean.getBoolean(INTERNAL_RETRY_FAILED_TESTS)); http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-test/src/main/java/org/apache/curator/test/Timing.java ---------------------------------------------------------------------- diff --git a/curator-test/src/main/java/org/apache/curator/test/Timing.java b/curator-test/src/main/java/org/apache/curator/test/Timing.java index f29b1c5..753d62d 100644 --- a/curator-test/src/main/java/org/apache/curator/test/Timing.java +++ b/curator-test/src/main/java/org/apache/curator/test/Timing.java @@ -34,7 +34,7 @@ public class Timing private static final int DEFAULT_SECONDS = 10; private static final int DEFAULT_WAITING_MULTIPLE = 5; - private static final double SESSION_MULTIPLE = .25; + private static final double SESSION_MULTIPLE = 1.5; /** * Use the default base time